rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLCxPeerPos.h>
3#include <xrpld/app/consensus/RCLValidations.h>
4#include <xrpld/app/ledger/AcceptedLedger.h>
5#include <xrpld/app/ledger/InboundLedgers.h>
6#include <xrpld/app/ledger/LedgerMaster.h>
7#include <xrpld/app/ledger/LedgerToJson.h>
8#include <xrpld/app/ledger/LocalTxs.h>
9#include <xrpld/app/ledger/OpenLedger.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/DeliverMax.h>
14#include <xrpld/app/misc/Transaction.h>
15#include <xrpld/app/misc/TxQ.h>
16#include <xrpld/app/misc/ValidatorKeys.h>
17#include <xrpld/app/misc/ValidatorList.h>
18#include <xrpld/app/misc/detail/AccountTxPaging.h>
19#include <xrpld/app/misc/make_NetworkOPs.h>
20#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
21#include <xrpld/consensus/Consensus.h>
22#include <xrpld/consensus/ConsensusParms.h>
23#include <xrpld/core/ConfigSections.h>
24#include <xrpld/overlay/Cluster.h>
25#include <xrpld/overlay/Overlay.h>
26#include <xrpld/overlay/predicates.h>
27#include <xrpld/rpc/BookChanges.h>
28#include <xrpld/rpc/CTID.h>
29#include <xrpld/rpc/DeliveredAmount.h>
30#include <xrpld/rpc/MPTokenIssuanceID.h>
31#include <xrpld/rpc/ServerHandler.h>
32
33#include <xrpl/basics/UptimeClock.h>
34#include <xrpl/basics/mulDiv.h>
35#include <xrpl/basics/safe_cast.h>
36#include <xrpl/basics/scope.h>
37#include <xrpl/beast/utility/rngfill.h>
38#include <xrpl/core/HashRouter.h>
39#include <xrpl/core/NetworkIDService.h>
40#include <xrpl/core/PerfLog.h>
41#include <xrpl/crypto/RFC1751.h>
42#include <xrpl/crypto/csprng.h>
43#include <xrpl/git/Git.h>
44#include <xrpl/ledger/AmendmentTable.h>
45#include <xrpl/ledger/OrderBookDB.h>
46#include <xrpl/ledger/helpers/AccountRootHelpers.h>
47#include <xrpl/ledger/helpers/DirectoryHelpers.h>
48#include <xrpl/ledger/helpers/TokenHelpers.h>
49#include <xrpl/protocol/BuildInfo.h>
50#include <xrpl/protocol/Feature.h>
51#include <xrpl/protocol/MultiApiJson.h>
52#include <xrpl/protocol/NFTSyntheticSerializer.h>
53#include <xrpl/protocol/RPCErr.h>
54#include <xrpl/protocol/Rate.h>
55#include <xrpl/protocol/TxFlags.h>
56#include <xrpl/protocol/jss.h>
57#include <xrpl/resource/Fees.h>
58#include <xrpl/resource/ResourceManager.h>
59#include <xrpl/server/LoadFeeTrack.h>
60#include <xrpl/tx/apply.h>
61
62#include <boost/asio/ip/host_name.hpp>
63#include <boost/asio/steady_timer.hpp>
64
65#include <algorithm>
66#include <exception>
67#include <mutex>
68#include <optional>
69#include <set>
70#include <sstream>
71#include <string>
72#include <tuple>
73#include <unordered_map>
74
75namespace xrpl {
76
77class NetworkOPsImp final : public NetworkOPs
78{
84 {
85 public:
87 bool const admin;
88 bool const local;
90 bool applied = false;
92
94 : transaction(t), admin(a), local(l), failType(f)
95 {
96 XRPL_ASSERT(
98 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
99 "valid inputs");
100 }
101 };
102
106 enum class DispatchState : unsigned char {
107 none,
108 scheduled,
109 running,
110 };
111
113
129 {
137
141 std::chrono::steady_clock::time_point start_ = std::chrono::steady_clock::now();
142 std::chrono::steady_clock::time_point const processStart_ = start_;
145
146 public:
148 {
149 counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions = 1;
150 }
151
158 void
160
166 void
167 json(Json::Value& obj) const;
168
176
177 CounterData
179 {
180 std::lock_guard const lock(mutex_);
182 }
183 };
184
187 {
188 ServerFeeSummary() = default;
189
191 XRPAmount fee,
192 TxQ::Metrics escalationMetrics, // trivially copyable
193 LoadFeeTrack const& loadFeeTrack);
194 bool
195 operator!=(ServerFeeSummary const& b) const;
196
197 bool
199 {
200 return !(*this != b);
201 }
202
207 };
208
209public:
211 ServiceRegistry& registry,
213 bool standalone,
214 std::size_t minPeerCount,
215 bool start_valid,
216 JobQueue& job_queue,
218 ValidatorKeys const& validatorKeys,
219 boost::asio::io_context& ioCtx,
220 beast::Journal journal,
221 beast::insight::Collector::ptr const& collector)
222 : registry_(registry)
223 , m_journal(journal)
226 , heartbeatTimer_(ioCtx)
227 , clusterTimer_(ioCtx)
229 , mConsensus(
230 registry_.get().getApp(),
232 setup_FeeVote(registry_.get().getApp().config().section("voting")),
233 registry_.get().getJournal("FeeVote")),
235 *m_localTX,
236 registry.getInboundTransactions(),
237 beast::get_abstract_clock<std::chrono::steady_clock>(),
238 validatorKeys,
239 registry_.get().getJournal("LedgerConsensus"))
240 , validatorPK_(
241 validatorKeys.keys ? validatorKeys.keys->publicKey : decltype(validatorPK_){})
243 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
244 : decltype(validatorMasterPK_){})
246 , m_job_queue(job_queue)
247 , m_standalone(standalone)
248 , minPeerCount_(start_valid ? 0 : minPeerCount)
249 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
250 {
251 }
252
253 ~NetworkOPsImp() override
254 {
255 // This clear() is necessary to ensure the shared_ptrs in this map get
256 // destroyed NOW because the objects in this map invoke methods on this
257 // class when they are destroyed
259 }
260
261public:
263 getOperatingMode() const override;
264
266 strOperatingMode(OperatingMode const mode, bool const admin) const override;
267
269 strOperatingMode(bool const admin = false) const override;
270
271 //
272 // Transaction operations.
273 //
274
275 // Must complete immediately.
276 void
278
279 void
281 std::shared_ptr<Transaction>& transaction,
282 bool bUnlimited,
283 bool bLocal,
284 FailHard failType) override;
285
286 void
287 processTransactionSet(CanonicalTXSet const& set) override;
288
297 void
298 doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType);
299
309 void
312 bool bUnlimited,
313 FailHard failtype);
314
315private:
316 bool
318
319 void
322 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
323
324public:
328 void
330
336 void
338
339 //
340 // Owner functions.
341 //
342
344 getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account) override;
345
346 //
347 // Book functions.
348 //
349
350 void
353 Book const&,
354 AccountID const& uTakerID,
355 bool const bProof,
356 unsigned int iLimit,
357 Json::Value const& jvMarker,
358 Json::Value& jvResult) override;
359
360 // Ledger proposal/close functions.
361 bool
363
364 bool
365 recvValidation(std::shared_ptr<STValidation> const& val, std::string const& source) override;
366
367 void
368 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
369
370 // Network state machine.
371
372 // Used for the "jump" case.
373private:
374 void
376 bool
378
379public:
380 bool
381 beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog)
382 override;
383 void
385 void
386 setStandAlone() override;
387
391 void
392 setStateTimer() override;
393
394 void
395 setNeedNetworkLedger() override;
396 void
397 clearNeedNetworkLedger() override;
398 bool
399 isNeedNetworkLedger() override;
400 bool
401 isFull() override;
402
403 void
404 setMode(OperatingMode om) override;
405
406 bool
407 isBlocked() override;
408 bool
409 isAmendmentBlocked() override;
410 void
411 setAmendmentBlocked() override;
412 bool
413 isAmendmentWarned() override;
414 void
415 setAmendmentWarned() override;
416 void
417 clearAmendmentWarned() override;
418 bool
419 isUNLBlocked() override;
420 void
421 setUNLBlocked() override;
422 void
423 clearUNLBlocked() override;
424 void
425 consensusViewChange() override;
426
428 getConsensusInfo() override;
430 getServerInfo(bool human, bool admin, bool counters) override;
431 void
432 clearLedgerFetch() override;
434 getLedgerFetchInfo() override;
437 void
438 reportFeeChange() override;
439 void
441
442 void
443 updateLocalTx(ReadView const& view) override;
445 getLocalTxCount() override;
446
447 //
448 // Monitoring: publisher side.
449 //
450 void
451 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
452 void
455 std::shared_ptr<STTx const> const& transaction,
456 TER result) override;
457 void
458 pubValidation(std::shared_ptr<STValidation> const& val) override;
459
460 //--------------------------------------------------------------------------
461 //
462 // InfoSub::Source.
463 //
464 void
465 subAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
466 override;
467 void
468 unsubAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
469 override;
470
471 // Just remove the subscription from the tracking
472 // not from the InfoSub. Needed for InfoSub destruction
473 void
474 unsubAccountInternal(std::uint64_t seq, hash_set<AccountID> const& vnaAccountIDs, bool rt)
475 override;
476
478 subAccountHistory(InfoSub::ref ispListener, AccountID const& account) override;
479 void
480 unsubAccountHistory(InfoSub::ref ispListener, AccountID const& account, bool historyOnly)
481 override;
482
483 void
484 unsubAccountHistoryInternal(std::uint64_t seq, AccountID const& account, bool historyOnly)
485 override;
486
487 bool
488 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
489 bool
490 unsubLedger(std::uint64_t uListener) override;
491
492 bool
493 subBookChanges(InfoSub::ref ispListener) override;
494 bool
495 unsubBookChanges(std::uint64_t uListener) override;
496
497 bool
498 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin) override;
499 bool
500 unsubServer(std::uint64_t uListener) override;
501
502 bool
503 subBook(InfoSub::ref ispListener, Book const&) override;
504 bool
505 unsubBook(std::uint64_t uListener, Book const&) override;
506
507 bool
508 subManifests(InfoSub::ref ispListener) override;
509 bool
510 unsubManifests(std::uint64_t uListener) override;
511 void
512 pubManifest(Manifest const&) override;
513
514 bool
515 subTransactions(InfoSub::ref ispListener) override;
516 bool
517 unsubTransactions(std::uint64_t uListener) override;
518
519 bool
520 subRTTransactions(InfoSub::ref ispListener) override;
521 bool
522 unsubRTTransactions(std::uint64_t uListener) override;
523
524 bool
525 subValidations(InfoSub::ref ispListener) override;
526 bool
527 unsubValidations(std::uint64_t uListener) override;
528
529 bool
530 subPeerStatus(InfoSub::ref ispListener) override;
531 bool
532 unsubPeerStatus(std::uint64_t uListener) override;
533 void
534 pubPeerStatus(std::function<Json::Value(void)> const&) override;
535
536 bool
537 subConsensus(InfoSub::ref ispListener) override;
538 bool
539 unsubConsensus(std::uint64_t uListener) override;
540
542 findRpcSub(std::string const& strUrl) override;
544 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
545 bool
546 tryRemoveRpcSub(std::string const& strUrl) override;
547
548 void
549 stop() override
550 {
551 {
552 try
553 {
554 heartbeatTimer_.cancel();
555 }
556 catch (boost::system::system_error const& e)
557 {
558 JLOG(m_journal.error()) << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
559 }
560
561 try
562 {
563 clusterTimer_.cancel();
564 }
565 catch (boost::system::system_error const& e)
566 {
567 JLOG(m_journal.error()) << "NetworkOPs: clusterTimer cancel error: " << e.what();
568 }
569
570 try
571 {
572 accountHistoryTxTimer_.cancel();
573 }
574 catch (boost::system::system_error const& e)
575 {
576 JLOG(m_journal.error())
577 << "NetworkOPs: accountHistoryTxTimer cancel error: " << e.what();
578 }
579 }
580 // Make sure that any waitHandlers pending in our timers are done.
581 using namespace std::chrono_literals;
582 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
583 }
584
585 void
586 stateAccounting(Json::Value& obj) override;
587
588private:
589 void
590 setTimer(
591 boost::asio::steady_timer& timer,
592 std::chrono::milliseconds const& expiry_time,
593 std::function<void()> onExpire,
594 std::function<void()> onError);
595 void
597 void
599 void
601 void
603
605 transJson(
606 std::shared_ptr<STTx const> const& transaction,
607 TER result,
608 bool validated,
611
612 void
615 AcceptedLedgerTx const& transaction,
616 bool last);
617
618 void
621 AcceptedLedgerTx const& transaction,
622 bool last);
623
624 void
627 std::shared_ptr<STTx const> const& transaction,
628 TER result);
629
630 void
631 pubServer();
632 void
634
636 getHostId(bool forAdmin);
637
638private:
642
643 /*
644 * With a validated ledger to separate history and future, the node
645 * streams historical txns with negative indexes starting from -1,
646 * and streams future txns starting from index 0.
647 * The SubAccountHistoryIndex struct maintains these indexes.
648 * It also has a flag stopHistorical_ for stopping streaming
649 * the historical txns.
650 */
652 {
654 // forward
656 // separate backward and forward
658 // history, backward
661 bool haveHistorical_{false};
663
664 SubAccountHistoryIndex(AccountID const& accountId) : accountId_(accountId)
665 {
666 }
667 };
680
684 void
688 void
690 void
692
695
697
699
701
706
708 boost::asio::steady_timer heartbeatTimer_;
709 boost::asio::steady_timer clusterTimer_;
710 boost::asio::steady_timer accountHistoryTxTimer_;
711
713
716
718
720
723
725
727
728 enum SubTypes {
729 sLedger, // Accepted ledgers.
730 sManifests, // Received validator manifests.
731 sServer, // When server changes connectivity state.
732 sTransactions, // All accepted transactions.
733 sRTTransactions, // All proposed and accepted transactions.
734 sValidations, // Received validations.
735 sPeerStatus, // Peer status changes.
736 sConsensusPhase, // Consensus phase
737 sBookChanges, // Per-ledger order book changes
738 sLastEntry // Any new entry must be ADDED ABOVE this one
739 };
740
742
744
746
747 // Whether we are in standalone mode.
748 bool const m_standalone;
749
750 // The number of nodes that we need to consider ourselves connected.
752
753 // Transaction batching.
758
760
763
764private:
765 struct Stats
766 {
767 template <class Handler>
768 Stats(Handler const& handler, beast::insight::Collector::ptr const& collector)
769 : hook(collector->make_hook(handler))
771 collector->make_gauge("State_Accounting", "Disconnected_duration"))
772 , connected_duration(collector->make_gauge("State_Accounting", "Connected_duration"))
773 , syncing_duration(collector->make_gauge("State_Accounting", "Syncing_duration"))
774 , tracking_duration(collector->make_gauge("State_Accounting", "Tracking_duration"))
775 , full_duration(collector->make_gauge("State_Accounting", "Full_duration"))
777 collector->make_gauge("State_Accounting", "Disconnected_transitions"))
779 collector->make_gauge("State_Accounting", "Connected_transitions"))
780 , syncing_transitions(collector->make_gauge("State_Accounting", "Syncing_transitions"))
782 collector->make_gauge("State_Accounting", "Tracking_transitions"))
783 , full_transitions(collector->make_gauge("State_Accounting", "Full_transitions"))
784 {
785 }
786
793
799 };
800
801 std::mutex m_statsMutex; // Mutex to lock m_stats
803
804private:
805 void
807};
808
809//------------------------------------------------------------------------------
810
812 {"disconnected", "connected", "syncing", "tracking", "full"}};
813
815
822
823static auto const genesisAccountId =
825
826//------------------------------------------------------------------------------
827inline OperatingMode
829{
830 return mMode;
831}
832
833inline std::string
834NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
835{
836 return strOperatingMode(mMode, admin);
837}
838
839inline void
844
845inline void
850
851inline void
856
857inline bool
862
863inline bool
868
871{
872 static std::string const hostname = boost::asio::ip::host_name();
873
874 if (forAdmin)
875 return hostname;
876
877 // For non-admin uses hash the node public key into a
878 // single RFC1751 word:
879 static std::string const shroudedHostId = [this]() {
880 auto const& id = registry_.get().getApp().nodeIdentity();
881
882 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
883 }();
884
885 return shroudedHostId;
886}
887
888void
890{
892
893 // Only do this work if a cluster is configured
894 if (registry_.get().getCluster().size() != 0)
896}
897
898void
900 boost::asio::steady_timer& timer,
901 std::chrono::milliseconds const& expiry_time,
902 std::function<void()> onExpire,
903 std::function<void()> onError)
904{
905 // Only start the timer if waitHandlerCounter_ is not yet joined.
906 if (auto optionalCountedHandler =
907 waitHandlerCounter_.wrap([this, onExpire, onError](boost::system::error_code const& e) {
908 if ((e.value() == boost::system::errc::success) && (!m_job_queue.isStopped()))
909 {
910 onExpire();
911 }
912 // Recover as best we can if an unexpected error occurs.
913 if (e.value() != boost::system::errc::success &&
914 e.value() != boost::asio::error::operation_aborted)
915 {
916 // Try again later and hope for the best.
917 JLOG(m_journal.error())
918 << "Timer got error '" << e.message() << "'. Restarting timer.";
919 onError();
920 }
921 }))
922 {
923 timer.expires_after(expiry_time);
924 timer.async_wait(std::move(*optionalCountedHandler));
925 }
926}
927
928void
929NetworkOPsImp::setHeartbeatTimer()
930{
931 setTimer(
932 heartbeatTimer_,
933 mConsensus.parms().ledgerGRANULARITY,
934 [this]() {
935 m_job_queue.addJob(jtNETOP_TIMER, "NetHeart", [this]() { processHeartbeatTimer(); });
936 },
937 [this]() { setHeartbeatTimer(); });
938}
939
940void
941NetworkOPsImp::setClusterTimer()
942{
943 using namespace std::chrono_literals;
944
945 setTimer(
946 clusterTimer_,
947 10s,
948 [this]() {
949 m_job_queue.addJob(jtNETOP_CLUSTER, "NetCluster", [this]() { processClusterTimer(); });
950 },
951 [this]() { setClusterTimer(); });
952}
953
954void
955NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
956{
957 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
958 << toBase58(subInfo.index_->accountId_);
959 using namespace std::chrono_literals;
960 setTimer(
961 accountHistoryTxTimer_,
962 4s,
963 [this, subInfo]() { addAccountHistoryJob(subInfo); },
964 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
965}
966
967void
968NetworkOPsImp::processHeartbeatTimer()
969{
970 RclConsensusLogger clog("Heartbeat Timer", mConsensus.validating(), m_journal);
971 {
972 std::unique_lock lock{registry_.get().getApp().getMasterMutex()};
973
974 // VFALCO NOTE This is for diagnosing a crash on exit
975 LoadManager& mgr(registry_.get().getLoadManager());
976 mgr.heartbeat();
977
978 std::size_t const numPeers = registry_.get().getOverlay().size();
979
980 // do we have sufficient peers? If not, we are disconnected.
981 if (numPeers < minPeerCount_)
982 {
983 if (mMode != OperatingMode::DISCONNECTED)
984 {
985 setMode(OperatingMode::DISCONNECTED);
987 ss << "Node count (" << numPeers << ") has fallen "
988 << "below required minimum (" << minPeerCount_ << ").";
989 JLOG(m_journal.warn()) << ss.str();
990 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
991 }
992 else
993 {
994 CLOG(clog.ss()) << "already DISCONNECTED. too few peers (" << numPeers
995 << "), need at least " << minPeerCount_;
996 }
997
998 // MasterMutex lock need not be held to call setHeartbeatTimer()
999 lock.unlock();
1000 // We do not call mConsensus.timerEntry until there are enough
1001 // peers providing meaningful inputs to consensus
1002 setHeartbeatTimer();
1003
1004 return;
1005 }
1006
1007 if (mMode == OperatingMode::DISCONNECTED)
1008 {
1009 setMode(OperatingMode::CONNECTED);
1010 JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
1011 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
1012 }
1013
1014 // Check if the last validated ledger forces a change between these
1015 // states.
1016 auto origMode = mMode.load();
1017 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
1018 if (mMode == OperatingMode::SYNCING)
1019 {
1020 setMode(OperatingMode::SYNCING);
1021 }
1022 else if (mMode == OperatingMode::CONNECTED)
1023 {
1024 setMode(OperatingMode::CONNECTED);
1025 }
1026 auto newMode = mMode.load();
1027 if (origMode != newMode)
1028 {
1029 CLOG(clog.ss()) << ", changing to " << strOperatingMode(newMode, true);
1030 }
1031 CLOG(clog.ss()) << ". ";
1032 }
1033
1034 mConsensus.timerEntry(registry_.get().getTimeKeeper().closeTime(), clog.ss());
1035
1036 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
1037 ConsensusPhase const currPhase = mConsensus.phase();
1038 if (mLastConsensusPhase != currPhase)
1039 {
1040 reportConsensusStateChange(currPhase);
1041 mLastConsensusPhase = currPhase;
1042 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1043 }
1044 CLOG(clog.ss()) << ". ";
1045
1046 setHeartbeatTimer();
1047}
1048
1049void
1050NetworkOPsImp::processClusterTimer()
1051{
1052 if (registry_.get().getCluster().size() == 0)
1053 return;
1054
1055 using namespace std::chrono_literals;
1056
1057 bool const update = registry_.get().getCluster().update(
1058 registry_.get().getApp().nodeIdentity().first,
1059 "",
1060 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1061 ? registry_.get().getFeeTrack().getLocalFee()
1062 : 0,
1063 registry_.get().getTimeKeeper().now());
1064
1065 if (!update)
1066 {
1067 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1068 setClusterTimer();
1069 return;
1070 }
1071
1072 protocol::TMCluster cluster;
1073 registry_.get().getCluster().for_each([&cluster](ClusterNode const& node) {
1074 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1075 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1076 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1077 n.set_nodeload(node.getLoadFee());
1078 if (!node.name().empty())
1079 n.set_nodename(node.name());
1080 });
1081
1082 Resource::Gossip const gossip = registry_.get().getResourceManager().exportConsumers();
1083 for (auto& item : gossip.items)
1084 {
1085 protocol::TMLoadSource& node = *cluster.add_loadsources();
1086 node.set_name(to_string(item.address));
1087 node.set_cost(item.balance);
1088 }
1089 registry_.get().getOverlay().foreach(
1090 send_if(std::make_shared<Message>(cluster, protocol::mtCLUSTER), peer_in_cluster()));
1091 setClusterTimer();
1092}
1093
1094//------------------------------------------------------------------------------
1095
1097NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin) const
1098{
1099 if (mode == OperatingMode::FULL && admin)
1100 {
1101 auto const consensusMode = mConsensus.mode();
1102 if (consensusMode != ConsensusMode::wrongLedger)
1103 {
1104 if (consensusMode == ConsensusMode::proposing)
1105 return "proposing";
1106
1107 if (mConsensus.validating())
1108 return "validating";
1109 }
1110 }
1111
1112 return states_[static_cast<std::size_t>(mode)];
1113}
1114
1115void
1116NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1117{
1118 if (isNeedNetworkLedger())
1119 {
1120 // Nothing we can do if we've never been in sync
1121 return;
1122 }
1123
1124 // Enforce Network bar for batch txn
1125 if (iTrans->isFlag(tfInnerBatchTxn) && m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1126 {
1127 JLOG(m_journal.error()) << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1128 return;
1129 }
1130
1131 // this is an asynchronous interface
1132 auto const trans = sterilize(*iTrans);
1133
1134 auto const txid = trans->getTransactionID();
1135 auto const flags = registry_.get().getHashRouter().getFlags(txid);
1136
1137 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1138 {
1139 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1140 return;
1141 }
1142
1143 try
1144 {
1145 auto const [validity, reason] = checkValidity(
1146 registry_.get().getHashRouter(), *trans, m_ledgerMaster.getValidatedRules());
1147
1148 if (validity != Validity::Valid)
1149 {
1150 JLOG(m_journal.warn()) << "Submitted transaction invalid: " << reason;
1151 return;
1152 }
1153 }
1154 catch (std::exception const& ex)
1155 {
1156 JLOG(m_journal.warn()) << "Exception checking transaction " << txid << ": " << ex.what();
1157
1158 return;
1159 }
1160
1161 std::string reason;
1162
1163 auto tx = std::make_shared<Transaction>(trans, reason, registry_.get().getApp());
1164
1165 m_job_queue.addJob(jtTRANSACTION, "SubmitTxn", [this, tx]() {
1166 auto t = tx;
1167 processTransaction(t, false, false, FailHard::no);
1168 });
1169}
1170
1171bool
1172NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1173{
1174 auto const newFlags = registry_.get().getHashRouter().getFlags(transaction->getID());
1175
1176 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1177 {
1178 // cached bad
1179 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1180 transaction->setStatus(INVALID);
1181 transaction->setResult(temBAD_SIGNATURE);
1182 return false;
1183 }
1184
1185 auto const view = m_ledgerMaster.getCurrentLedger();
1186
1187 // This function is called by several different parts of the codebase
1188 // under no circumstances will we ever accept an inner txn within a batch
1189 // txn from the network.
1190 auto const sttx = *transaction->getSTransaction();
1191 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1192 {
1193 transaction->setStatus(INVALID);
1194 transaction->setResult(temINVALID_FLAG);
1195 registry_.get().getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1196 return false;
1197 }
1198
1199 // NOTE ximinez - I think this check is redundant,
1200 // but I'm not 100% sure yet.
1201 // If so, only cost is looking up HashRouter flags.
1202 auto const [validity, reason] =
1203 checkValidity(registry_.get().getHashRouter(), sttx, view->rules());
1204 XRPL_ASSERT(
1205 validity == Validity::Valid, "xrpl::NetworkOPsImp::processTransaction : valid validity");
1206
1207 // Not concerned with local checks at this point.
1208 if (validity == Validity::SigBad)
1209 {
1210 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1211 transaction->setStatus(INVALID);
1212 transaction->setResult(temBAD_SIGNATURE);
1213 registry_.get().getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1214 return false;
1215 }
1216
1217 // canonicalize can change our pointer
1218 registry_.get().getMasterTransaction().canonicalize(&transaction);
1219
1220 return true;
1221}
1222
1223void
1224NetworkOPsImp::processTransaction(
1225 std::shared_ptr<Transaction>& transaction,
1226 bool bUnlimited,
1227 bool bLocal,
1228 FailHard failType)
1229{
1230 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1231
1232 // preProcessTransaction can change our pointer
1233 if (!preProcessTransaction(transaction))
1234 return;
1235
1236 if (bLocal)
1237 {
1238 doTransactionSync(transaction, bUnlimited, failType);
1239 }
1240 else
1241 {
1242 doTransactionAsync(transaction, bUnlimited, failType);
1243 }
1244}
1245
1246void
1247NetworkOPsImp::doTransactionAsync(
1248 std::shared_ptr<Transaction> transaction,
1249 bool bUnlimited,
1250 FailHard failType)
1251{
1252 std::lock_guard const lock(mMutex);
1253
1254 if (transaction->getApplying())
1255 return;
1256
1257 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, false, failType));
1258 transaction->setApplying();
1259
1260 if (mDispatchState == DispatchState::none)
1261 {
1262 if (m_job_queue.addJob(jtBATCH, "TxBatchAsync", [this]() { transactionBatch(); }))
1263 {
1264 mDispatchState = DispatchState::scheduled;
1265 }
1266 }
1267}
1268
1269void
1270NetworkOPsImp::doTransactionSync(
1271 std::shared_ptr<Transaction> transaction,
1272 bool bUnlimited,
1273 FailHard failType)
1274{
1275 std::unique_lock<std::mutex> lock(mMutex);
1276
1277 if (!transaction->getApplying())
1278 {
1279 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, true, failType));
1280 transaction->setApplying();
1281 }
1282
1283 doTransactionSyncBatch(lock, [&transaction](std::unique_lock<std::mutex> const&) {
1284 return transaction->getApplying();
1285 });
1286}
1287
1288void
1289NetworkOPsImp::doTransactionSyncBatch(
1291 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1292{
1293 do
1294 {
1295 if (mDispatchState == DispatchState::running)
1296 {
1297 // A batch processing job is already running, so wait.
1298 mCond.wait(lock);
1299 }
1300 else
1301 {
1302 apply(lock);
1303
1304 if (!mTransactions.empty())
1305 {
1306 // More transactions need to be applied, but by another job.
1307 if (m_job_queue.addJob(jtBATCH, "TxBatchSync", [this]() { transactionBatch(); }))
1308 {
1309 mDispatchState = DispatchState::scheduled;
1310 }
1311 }
1312 }
1313 } while (retryCallback(lock));
1314}
1315
1316void
1317NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1318{
1319 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1321 candidates.reserve(set.size());
1322 for (auto const& [_, tx] : set)
1323 {
1324 std::string reason;
1325 auto transaction = std::make_shared<Transaction>(tx, reason, registry_.get().getApp());
1326
1327 if (transaction->getStatus() == INVALID)
1328 {
1329 if (!reason.empty())
1330 {
1331 JLOG(m_journal.trace()) << "Exception checking transaction: " << reason;
1332 }
1333 registry_.get().getHashRouter().setFlags(tx->getTransactionID(), HashRouterFlags::BAD);
1334 continue;
1335 }
1336
1337 // preProcessTransaction can change our pointer
1338 if (!preProcessTransaction(transaction))
1339 continue;
1340
1341 candidates.emplace_back(transaction);
1342 }
1343
1344 std::vector<TransactionStatus> transactions;
1345 transactions.reserve(candidates.size());
1346
1347 std::unique_lock lock(mMutex);
1348
1349 for (auto& transaction : candidates)
1350 {
1351 if (!transaction->getApplying())
1352 {
1353 transactions.emplace_back(transaction, false, false, FailHard::no);
1354 transaction->setApplying();
1355 }
1356 }
1357
1358 if (mTransactions.empty())
1359 {
1360 mTransactions.swap(transactions);
1361 }
1362 else
1363 {
1364 mTransactions.reserve(mTransactions.size() + transactions.size());
1365 for (auto& t : transactions)
1366 mTransactions.push_back(std::move(t));
1367 }
1368 if (mTransactions.empty())
1369 {
1370 JLOG(m_journal.debug()) << "No transaction to process!";
1371 return;
1372 }
1373
1374 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1375 XRPL_ASSERT(lock.owns_lock(), "xrpl::NetworkOPsImp::processTransactionSet has lock");
1376 return std::any_of(mTransactions.begin(), mTransactions.end(), [](auto const& t) {
1377 return t.transaction->getApplying();
1378 });
1379 });
1380}
1381
1382void
1383NetworkOPsImp::transactionBatch()
1384{
1385 std::unique_lock<std::mutex> lock(mMutex);
1386
1387 if (mDispatchState == DispatchState::running)
1388 return;
1389
1390 while (!mTransactions.empty())
1391 {
1392 apply(lock);
1393 }
1394}
1395
1396void
1397NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1398{
1400 std::vector<TransactionStatus> transactions;
1401 mTransactions.swap(transactions);
1402 XRPL_ASSERT(!transactions.empty(), "xrpl::NetworkOPsImp::apply : non-empty transactions");
1403 XRPL_ASSERT(
1404 mDispatchState != DispatchState::running, "xrpl::NetworkOPsImp::apply : is not running");
1405
1406 mDispatchState = DispatchState::running;
1407
1408 batchLock.unlock();
1409
1410 {
1411 std::unique_lock masterLock{registry_.get().getApp().getMasterMutex(), std::defer_lock};
1412 bool changed = false;
1413 {
1414 std::unique_lock ledgerLock{m_ledgerMaster.peekMutex(), std::defer_lock};
1415 std::lock(masterLock, ledgerLock);
1416 registry_.get().getOpenLedger().modify([&](OpenView& view, beast::Journal j) {
1417 for (TransactionStatus& e : transactions)
1418 {
1419 // we check before adding to the batch
1420 ApplyFlags flags = tapNONE;
1421 if (e.admin)
1422 flags |= tapUNLIMITED;
1423
1424 if (e.failType == FailHard::yes)
1425 flags |= tapFAIL_HARD;
1426
1427 auto const result = registry_.get().getTxQ().apply(
1428 registry_.get().getApp(), view, e.transaction->getSTransaction(), flags, j);
1429 e.result = result.ter;
1430 e.applied = result.applied;
1431 changed = changed || result.applied;
1432 }
1433 return changed;
1434 });
1435 }
1436 if (changed)
1437 reportFeeChange();
1438
1439 std::optional<LedgerIndex> validatedLedgerIndex;
1440 if (auto const l = m_ledgerMaster.getValidatedLedger())
1441 validatedLedgerIndex = l->header().seq;
1442
1443 auto newOL = registry_.get().getOpenLedger().current();
1444 for (TransactionStatus const& e : transactions)
1445 {
1446 e.transaction->clearSubmitResult();
1447
1448 if (e.applied)
1449 {
1450 pubProposedTransaction(newOL, e.transaction->getSTransaction(), e.result);
1451 e.transaction->setApplied();
1452 }
1453
1454 e.transaction->setResult(e.result);
1455
1456 if (isTemMalformed(e.result))
1457 {
1458 registry_.get().getHashRouter().setFlags(
1459 e.transaction->getID(), HashRouterFlags::BAD);
1460 }
1461
1462#ifdef DEBUG
1463 if (!isTesSuccess(e.result))
1464 {
1465 std::string token, human;
1466
1467 if (transResultInfo(e.result, token, human))
1468 {
1469 JLOG(m_journal.info()) << "TransactionResult: " << token << ": " << human;
1470 }
1471 }
1472#endif
1473
1474 bool const addLocal = e.local;
1475
1476 if (isTesSuccess(e.result))
1477 {
1478 JLOG(m_journal.debug()) << "Transaction is now included in open ledger";
1479 e.transaction->setStatus(INCLUDED);
1480
1481 // Pop as many "reasonable" transactions for this account as
1482 // possible. "Reasonable" means they have sequential sequence
1483 // numbers, or use tickets.
1484 auto const& txCur = e.transaction->getSTransaction();
1485
1486 std::size_t count = 0;
1487 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1488 txNext && count < maxPoppedTransactions;
1489 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1490 {
1491 if (!batchLock.owns_lock())
1492 batchLock.lock();
1493 std::string reason;
1494 auto const trans = sterilize(*txNext);
1495 auto t = std::make_shared<Transaction>(trans, reason, registry_.get().getApp());
1496 if (t->getApplying())
1497 break;
1498 submit_held.emplace_back(t, false, false, FailHard::no);
1499 t->setApplying();
1500 }
1501 if (batchLock.owns_lock())
1502 batchLock.unlock();
1503 }
1504 else if (e.result == tefPAST_SEQ)
1505 {
1506 // duplicate or conflict
1507 JLOG(m_journal.info()) << "Transaction is obsolete";
1508 e.transaction->setStatus(OBSOLETE);
1509 }
1510 else if (e.result == terQUEUED)
1511 {
1512 JLOG(m_journal.debug()) << "Transaction is likely to claim a"
1513 << " fee, but is queued until fee drops";
1514
1515 e.transaction->setStatus(HELD);
1516 // Add to held transactions, because it could get
1517 // kicked out of the queue, and this will try to
1518 // put it back.
1519 m_ledgerMaster.addHeldTransaction(e.transaction);
1520 e.transaction->setQueued();
1521 e.transaction->setKept();
1522 }
1523 else if (isTerRetry(e.result) || isTelLocal(e.result) || isTefFailure(e.result))
1524 {
1525 if (e.failType != FailHard::yes)
1526 {
1527 auto const lastLedgerSeq =
1528 e.transaction->getSTransaction()->at(~sfLastLedgerSequence);
1529 auto const ledgersLeft = lastLedgerSeq
1530 ? *lastLedgerSeq - m_ledgerMaster.getCurrentLedgerIndex()
1532 // If any of these conditions are met, the transaction can
1533 // be held:
1534 // 1. It was submitted locally. (Note that this flag is only
1535 // true on the initial submission.)
1536 // 2. The transaction has a LastLedgerSequence, and the
1537 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1538 // (5) ledgers into the future. (Remember that an
1539 // unseated optional compares as less than all seated
1540 // values, so it has to be checked explicitly first.)
1541 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1542 // (setFlags
1543 // checks before setting. If the flag is set, it returns
1544 // false, which means it's been held once without one of
1545 // the other conditions, so don't hold it again. Time's
1546 // up!)
1547 //
1548 if (e.local || (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1549 registry_.get().getHashRouter().setFlags(
1550 e.transaction->getID(), HashRouterFlags::HELD))
1551 {
1552 // transaction should be held
1553 JLOG(m_journal.debug()) << "Transaction should be held: " << e.result;
1554 e.transaction->setStatus(HELD);
1555 m_ledgerMaster.addHeldTransaction(e.transaction);
1556 e.transaction->setKept();
1557 }
1558 else
1559 JLOG(m_journal.debug())
1560 << "Not holding transaction " << e.transaction->getID() << ": "
1561 << (e.local ? "local" : "network") << ", "
1562 << "result: " << e.result << " ledgers left: "
1563 << (ledgersLeft ? to_string(*ledgersLeft) : "unspecified");
1564 }
1565 }
1566 else
1567 {
1568 JLOG(m_journal.debug()) << "Status other than success " << e.result;
1569 e.transaction->setStatus(INVALID);
1570 }
1571
1572 auto const enforceFailHard = e.failType == FailHard::yes && !isTesSuccess(e.result);
1573
1574 if (addLocal && !enforceFailHard)
1575 {
1576 m_localTX->push_back(
1577 m_ledgerMaster.getCurrentLedgerIndex(), e.transaction->getSTransaction());
1578 e.transaction->setKept();
1579 }
1580
1581 if ((e.applied ||
1582 ((mMode != OperatingMode::FULL) && (e.failType != FailHard::yes) && e.local) ||
1583 (e.result == terQUEUED)) &&
1584 !enforceFailHard)
1585 {
1586 auto const toSkip =
1587 registry_.get().getHashRouter().shouldRelay(e.transaction->getID());
1588 if (auto const sttx = *(e.transaction->getSTransaction()); toSkip &&
1589 // Skip relaying if it's an inner batch txn. The flag should
1590 // only be set if the Batch feature is enabled. If Batch is
1591 // not enabled, the flag is always invalid, so don't relay
1592 // it regardless.
1593 !(sttx.isFlag(tfInnerBatchTxn)))
1594 {
1595 protocol::TMTransaction tx;
1596 Serializer s;
1597
1598 sttx.add(s);
1599 tx.set_rawtransaction(s.data(), s.size());
1600 tx.set_status(protocol::tsCURRENT);
1601 tx.set_receivetimestamp(
1602 registry_.get().getTimeKeeper().now().time_since_epoch().count());
1603 tx.set_deferred(e.result == terQUEUED);
1604 // FIXME: This should be when we received it
1605 registry_.get().getOverlay().relay(e.transaction->getID(), tx, *toSkip);
1606 e.transaction->setBroadcast();
1607 }
1608 }
1609
1610 if (validatedLedgerIndex)
1611 {
1612 auto [fee, accountSeq, availableSeq] =
1613 registry_.get().getTxQ().getTxRequiredFeeAndSeq(
1614 *newOL, e.transaction->getSTransaction());
1615 e.transaction->setCurrentLedgerState(
1616 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1617 }
1618 }
1619 }
1620
1621 batchLock.lock();
1622
1623 for (TransactionStatus const& e : transactions)
1624 e.transaction->clearApplying();
1625
1626 if (!submit_held.empty())
1627 {
1628 if (mTransactions.empty())
1629 {
1630 mTransactions.swap(submit_held);
1631 }
1632 else
1633 {
1634 mTransactions.reserve(mTransactions.size() + submit_held.size());
1635 for (auto& e : submit_held)
1636 mTransactions.push_back(std::move(e));
1637 }
1638 }
1639
1640 mCond.notify_all();
1641
1642 mDispatchState = DispatchState::none;
1643}
1644
1645//
1646// Owner functions
1647//
1648
1650NetworkOPsImp::getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account)
1651{
1652 Json::Value jvObjects(Json::objectValue);
1653 auto root = keylet::ownerDir(account);
1654 auto sleNode = lpLedger->read(keylet::page(root));
1655 if (sleNode)
1656 {
1657 std::uint64_t uNodeDir = 0;
1658
1659 do
1660 {
1661 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1662 {
1663 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1664 XRPL_ASSERT(sleCur, "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1665
1666 switch (sleCur->getType())
1667 {
1668 case ltOFFER:
1669 if (!jvObjects.isMember(jss::offers))
1670 jvObjects[jss::offers] = Json::Value(Json::arrayValue);
1671
1672 jvObjects[jss::offers].append(sleCur->getJson(JsonOptions::none));
1673 break;
1674
1675 case ltRIPPLE_STATE:
1676 if (!jvObjects.isMember(jss::ripple_lines))
1677 {
1678 jvObjects[jss::ripple_lines] = Json::Value(Json::arrayValue);
1679 }
1680
1681 jvObjects[jss::ripple_lines].append(sleCur->getJson(JsonOptions::none));
1682 break;
1683
1684 case ltACCOUNT_ROOT:
1685 case ltDIR_NODE:
1686 // LCOV_EXCL_START
1687 default:
1688 UNREACHABLE(
1689 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1690 "type");
1691 break;
1692 // LCOV_EXCL_STOP
1693 }
1694 }
1695
1696 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1697
1698 if (uNodeDir != 0u)
1699 {
1700 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1701 XRPL_ASSERT(sleNode, "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1702 }
1703 } while (uNodeDir != 0u);
1704 }
1705
1706 return jvObjects;
1707}
1708
1709//
1710// Other
1711//
1712
1713inline bool
1714NetworkOPsImp::isBlocked()
1715{
1716 return isAmendmentBlocked() || isUNLBlocked();
1717}
1718
1719inline bool
1720NetworkOPsImp::isAmendmentBlocked()
1721{
1722 return amendmentBlocked_;
1723}
1724
1725void
1726NetworkOPsImp::setAmendmentBlocked()
1727{
1728 amendmentBlocked_ = true;
1729 setMode(OperatingMode::CONNECTED);
1730}
1731
1732inline bool
1733NetworkOPsImp::isAmendmentWarned()
1734{
1735 return !amendmentBlocked_ && amendmentWarned_;
1736}
1737
1738inline void
1739NetworkOPsImp::setAmendmentWarned()
1740{
1741 amendmentWarned_ = true;
1742}
1743
1744inline void
1745NetworkOPsImp::clearAmendmentWarned()
1746{
1747 amendmentWarned_ = false;
1748}
1749
1750inline bool
1751NetworkOPsImp::isUNLBlocked()
1752{
1753 return unlBlocked_;
1754}
1755
1756void
1757NetworkOPsImp::setUNLBlocked()
1758{
1759 unlBlocked_ = true;
1760 setMode(OperatingMode::CONNECTED);
1761}
1762
1763inline void
1764NetworkOPsImp::clearUNLBlocked()
1765{
1766 unlBlocked_ = false;
1767}
1768
1769bool
1770NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint256& networkClosed)
1771{
1772 // Returns true if there's an *abnormal* ledger issue, normal changing in
1773 // TRACKING mode should return false. Do we have sufficient validations for
1774 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1775 // better ledger available? If so, we are either tracking or full.
1776
1777 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1778
1779 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1780
1781 if (!ourClosed)
1782 return false;
1783
1784 uint256 closedLedger = ourClosed->header().hash;
1785 uint256 const prevClosedLedger = ourClosed->header().parentHash;
1786 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1787 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1788
1789 //-------------------------------------------------------------------------
1790 // Determine preferred last closed ledger
1791
1792 auto& validations = registry_.get().getValidations();
1793 JLOG(m_journal.debug()) << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1794
1795 // Will rely on peer LCL if no trusted validations exist
1797 peerCounts[closedLedger] = 0;
1798 if (mMode >= OperatingMode::TRACKING)
1799 peerCounts[closedLedger]++;
1800
1801 for (auto& peer : peerList)
1802 {
1803 uint256 const peerLedger = peer->getClosedLedgerHash();
1804
1805 if (peerLedger.isNonZero())
1806 ++peerCounts[peerLedger];
1807 }
1808
1809 for (auto const& it : peerCounts)
1810 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1811
1812 uint256 const preferredLCL = validations.getPreferredLCL(
1813 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1814 m_ledgerMaster.getValidLedgerIndex(),
1815 peerCounts);
1816
1817 bool switchLedgers = preferredLCL != closedLedger;
1818 if (switchLedgers)
1819 closedLedger = preferredLCL;
1820 //-------------------------------------------------------------------------
1821 if (switchLedgers && (closedLedger == prevClosedLedger))
1822 {
1823 // don't switch to our own previous ledger
1824 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1825 networkClosed = ourClosed->header().hash;
1826 switchLedgers = false;
1827 }
1828 else
1829 {
1830 networkClosed = closedLedger;
1831 }
1832
1833 if (!switchLedgers)
1834 return false;
1835
1836 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1837
1838 if (!consensus)
1839 {
1840 consensus = registry_.get().getInboundLedgers().acquire(
1841 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1842 }
1843
1844 if (consensus &&
1845 (!m_ledgerMaster.canBeCurrent(consensus) ||
1846 !m_ledgerMaster.isCompatible(*consensus, m_journal.debug(), "Not switching")))
1847 {
1848 // Don't switch to a ledger not on the validated chain
1849 // or with an invalid close time or sequence
1850 networkClosed = ourClosed->header().hash;
1851 return false;
1852 }
1853
1854 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1855 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->header().hash << getJson({*ourClosed, {}});
1856 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1857
1858 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1859 {
1860 setMode(OperatingMode::CONNECTED);
1861 }
1862
1863 if (consensus)
1864 {
1865 // FIXME: If this rewinds the ledger sequence, or has the same
1866 // sequence, we should update the status on any stored transactions
1867 // in the invalidated ledgers.
1868 switchLastClosedLedger(consensus);
1869 }
1870
1871 return true;
1872}
1873
1874void
1875NetworkOPsImp::switchLastClosedLedger(std::shared_ptr<Ledger const> const& newLCL)
1876{
1877 // set the newLCL as our last closed ledger -- this is abnormal code
1878 JLOG(m_journal.error()) << "JUMP last closed ledger to " << newLCL->header().hash;
1879
1880 clearNeedNetworkLedger();
1881
1882 // Update fee computations.
1883 registry_.get().getTxQ().processClosedLedger(registry_.get().getApp(), *newLCL, true);
1884
1885 // Caller must own master lock
1886 {
1887 // Apply tx in old open ledger to new
1888 // open ledger. Then apply local tx.
1889
1890 auto retries = m_localTX->getTxSet();
1891 auto const lastVal = registry_.get().getLedgerMaster().getValidatedLedger();
1893 if (lastVal)
1894 {
1895 rules = makeRulesGivenLedger(*lastVal, registry_.get().getApp().config().features);
1896 }
1897 else
1898 {
1899 rules.emplace(registry_.get().getApp().config().features);
1900 }
1901 registry_.get().getOpenLedger().accept(
1902 registry_.get().getApp(),
1903 *rules,
1904 newLCL,
1905 OrderedTxs({}),
1906 false,
1907 retries,
1908 tapNONE,
1909 "jump",
1910 [&](OpenView& view, beast::Journal j) {
1911 // Stuff the ledger with transactions from the queue.
1912 return registry_.get().getTxQ().accept(registry_.get().getApp(), view);
1913 });
1914 }
1915
1916 m_ledgerMaster.switchLCL(newLCL);
1917
1918 protocol::TMStatusChange s;
1919 s.set_newevent(protocol::neSWITCHED_LEDGER);
1920 s.set_ledgerseq(newLCL->header().seq);
1921 s.set_networktime(registry_.get().getTimeKeeper().now().time_since_epoch().count());
1922 s.set_ledgerhashprevious(
1923 newLCL->header().parentHash.begin(), newLCL->header().parentHash.size());
1924 s.set_ledgerhash(newLCL->header().hash.begin(), newLCL->header().hash.size());
1925 registry_.get().getOverlay().foreach(
1926 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
1927}
1928
1929bool
1930NetworkOPsImp::beginConsensus(
1931 uint256 const& networkClosed,
1933{
1934 XRPL_ASSERT(networkClosed.isNonZero(), "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
1935
1936 auto closingInfo = m_ledgerMaster.getCurrentLedger()->header();
1937
1938 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq << " with LCL "
1939 << closingInfo.parentHash;
1940
1941 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1942
1943 if (!prevLedger)
1944 {
1945 // this shouldn't happen unless we jump ledgers
1946 if (mMode == OperatingMode::FULL)
1947 {
1948 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
1949 setMode(OperatingMode::TRACKING);
1950 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
1951 }
1952
1953 CLOG(clog) << "beginConsensus no previous ledger. ";
1954 return false;
1955 }
1956
1957 XRPL_ASSERT(
1958 prevLedger->header().hash == closingInfo.parentHash,
1959 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
1960 "parent");
1961 XRPL_ASSERT(
1962 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->header().hash,
1963 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
1964 "hash");
1965
1966 registry_.get().getValidators().setNegativeUNL(prevLedger->negativeUNL());
1967 TrustChanges const changes = registry_.get().getValidators().updateTrusted(
1968 registry_.get().getValidations().getCurrentNodeIDs(),
1969 closingInfo.parentCloseTime,
1970 *this,
1971 registry_.get().getOverlay(),
1972 registry_.get().getHashRouter());
1973
1974 if (!changes.added.empty() || !changes.removed.empty())
1975 {
1976 registry_.get().getValidations().trustChanged(changes.added, changes.removed);
1977 // Update the AmendmentTable so it tracks the current validators.
1978 registry_.get().getAmendmentTable().trustChanged(
1979 registry_.get().getValidators().getQuorumKeys().second);
1980 }
1981
1982 mConsensus.startRound(
1983 registry_.get().getTimeKeeper().closeTime(),
1984 networkClosed,
1985 prevLedger,
1986 changes.removed,
1987 changes.added,
1988 clog);
1989
1990 ConsensusPhase const currPhase = mConsensus.phase();
1991 if (mLastConsensusPhase != currPhase)
1992 {
1993 reportConsensusStateChange(currPhase);
1994 mLastConsensusPhase = currPhase;
1995 }
1996
1997 JLOG(m_journal.debug()) << "Initiating consensus engine";
1998 return true;
1999}
2000
2001bool
2002NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
2003{
2004 auto const& peerKey = peerPos.publicKey();
2005 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2006 {
2007 // Could indicate a operator misconfiguration where two nodes are
2008 // running with the same validator key configured, so this isn't fatal,
2009 // and it doesn't necessarily indicate peer misbehavior. But since this
2010 // is a trusted message, it could be a very big deal. Either way, we
2011 // don't want to relay the proposal. Note that the byzantine behavior
2012 // detection in handleNewValidation will notify other peers.
2013 //
2014 // Another, innocuous explanation is unusual message routing and delays,
2015 // causing this node to receive its own messages back.
2016 JLOG(m_journal.error()) << "Received a proposal signed by MY KEY from a peer. This may "
2017 "indicate a misconfiguration where another node has the same "
2018 "validator key, or may be caused by unusual message routing and "
2019 "delays.";
2020 return false;
2021 }
2022
2023 return mConsensus.peerProposal(registry_.get().getTimeKeeper().closeTime(), peerPos);
2024}
2025
2026void
2027NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
2028{
2029 // We now have an additional transaction set
2030 // Inform peers we have this set
2031 protocol::TMHaveTransactionSet msg;
2032 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2033 msg.set_status(protocol::tsHAVE);
2034 registry_.get().getOverlay().foreach(
2035 send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
2036
2037 // We acquired it because consensus asked us to
2038 if (fromAcquire)
2039 mConsensus.gotTxSet(registry_.get().getTimeKeeper().closeTime(), RCLTxSet{map});
2040}
2041
2042void
2043NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
2044{
2045 uint256 const deadLedger = m_ledgerMaster.getClosedLedger()->header().parentHash;
2046 for (auto const& it : registry_.get().getOverlay().getActivePeers())
2047 {
2048 if (it && (it->getClosedLedgerHash() == deadLedger))
2049 {
2050 JLOG(m_journal.trace()) << "Killing obsolete peer status";
2051 it->cycleStatus();
2052 }
2053 }
2054
2055 uint256 networkClosed;
2056 bool const ledgerChange =
2057 checkLastClosedLedger(registry_.get().getOverlay().getActivePeers(), networkClosed);
2058
2059 if (networkClosed.isZero())
2060 {
2061 CLOG(clog) << "endConsensus last closed ledger is zero. ";
2062 return;
2063 }
2064
2065 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
2066 // we must count how many nodes share our LCL, how many nodes disagree with
2067 // our LCL, and how many validations our LCL has. We also want to check
2068 // timing to make sure there shouldn't be a newer LCL. We need this
2069 // information to do the next three tests.
2070
2071 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::SYNCING)) && !ledgerChange)
2072 {
2073 // Count number of peers that agree with us and UNL nodes whose
2074 // validations we have for LCL. If the ledger is good enough, go to
2075 // TRACKING - TODO
2076 if (!needNetworkLedger_)
2077 setMode(OperatingMode::TRACKING);
2078 }
2079
2080 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
2081 !ledgerChange)
2082 {
2083 // check if the ledger is good enough to go to FULL
2084 // Note: Do not go to FULL if we don't have the previous ledger
2085 // check if the ledger is bad enough to go to CONNECTED -- TODO
2086 auto current = m_ledgerMaster.getCurrentLedger();
2087 if (registry_.get().getTimeKeeper().now() <
2088 (current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
2089 {
2090 setMode(OperatingMode::FULL);
2091 }
2092 }
2093
2094 beginConsensus(networkClosed, clog);
2095}
2096
2097void
2098NetworkOPsImp::consensusViewChange()
2099{
2100 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2101 {
2102 setMode(OperatingMode::CONNECTED);
2103 }
2104}
2105
2106void
2107NetworkOPsImp::pubManifest(Manifest const& mo)
2108{
2109 // VFALCO consider std::shared_mutex
2110 std::lock_guard const sl(mSubLock);
2111
2112 if (!mStreamMaps[sManifests].empty())
2113 {
2115
2116 jvObj[jss::type] = "manifestReceived";
2117 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2118 if (mo.signingKey)
2119 jvObj[jss::signing_key] = toBase58(TokenType::NodePublic, *mo.signingKey);
2120 jvObj[jss::seq] = Json::UInt(mo.sequence);
2121 if (auto sig = mo.getSignature())
2122 jvObj[jss::signature] = strHex(*sig);
2123 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2124 if (!mo.domain.empty())
2125 jvObj[jss::domain] = mo.domain;
2126 jvObj[jss::manifest] = strHex(mo.serialized);
2127
2128 for (auto i = mStreamMaps[sManifests].begin(); i != mStreamMaps[sManifests].end();)
2129 {
2130 if (auto p = i->second.lock())
2131 {
2132 p->send(jvObj, true);
2133 ++i;
2134 }
2135 else
2136 {
2137 i = mStreamMaps[sManifests].erase(i);
2138 }
2139 }
2140 }
2141}
2142
2143NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2144 XRPAmount fee,
2145 TxQ::Metrics escalationMetrics, // trivially copyable
2146 LoadFeeTrack const& loadFeeTrack)
2147 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2148 , loadBaseServer{loadFeeTrack.getLoadBase()}
2149 , baseFee{fee}
2150 , em{escalationMetrics}
2151{
2152}
2153
2154bool
2156{
2157 if (loadFactorServer != b.loadFactorServer || loadBaseServer != b.loadBaseServer ||
2158 baseFee != b.baseFee || em.has_value() != b.em.has_value())
2159 return true;
2160
2161 if (em && b.em)
2162 {
2163 return (
2164 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2165 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel ||
2166 em->referenceFeeLevel != b.em->referenceFeeLevel);
2167 }
2168
2169 return false;
2170}
2171
2172// Need to cap to uint64 to uint32 due to JSON limitations
2173static std::uint32_t
2175{
2177
2178 return std::min(max32, v);
2179};
2180
2181void
2183{
2184 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2185 // list into a local array while holding the lock then release
2186 // the lock and call send on everyone.
2187 //
2188 std::lock_guard const sl(mSubLock);
2189
2190 if (!mStreamMaps[sServer].empty())
2191 {
2193
2195 registry_.get().getOpenLedger().current()->fees().base,
2196 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current()),
2197 registry_.get().getFeeTrack()};
2198
2199 jvObj[jss::type] = "serverStatus";
2200 jvObj[jss::server_status] = strOperatingMode();
2201 jvObj[jss::load_base] = f.loadBaseServer;
2202 jvObj[jss::load_factor_server] = f.loadFactorServer;
2203 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2204
2205 if (f.em)
2206 {
2207 auto const loadFactor = std::max(
2208 safe_cast<std::uint64_t>(f.loadFactorServer),
2209 mulDiv(f.em->openLedgerFeeLevel, f.loadBaseServer, f.em->referenceFeeLevel)
2211
2212 jvObj[jss::load_factor] = trunc32(loadFactor);
2213 jvObj[jss::load_factor_fee_escalation] = f.em->openLedgerFeeLevel.jsonClipped();
2214 jvObj[jss::load_factor_fee_queue] = f.em->minProcessingFeeLevel.jsonClipped();
2215 jvObj[jss::load_factor_fee_reference] = f.em->referenceFeeLevel.jsonClipped();
2216 }
2217 else
2218 {
2219 jvObj[jss::load_factor] = f.loadFactorServer;
2220 }
2221
2222 mLastFeeSummary = f;
2223
2224 for (auto i = mStreamMaps[sServer].begin(); i != mStreamMaps[sServer].end();)
2225 {
2226 InfoSub::pointer const p = i->second.lock();
2227
2228 // VFALCO TODO research the possibility of using thread queues and
2229 // linearizing the deletion of subscribers with the
2230 // sending of JSON data.
2231 if (p)
2232 {
2233 p->send(jvObj, true);
2234 ++i;
2235 }
2236 else
2237 {
2238 i = mStreamMaps[sServer].erase(i);
2239 }
2240 }
2241 }
2242}
2243
2244void
2246{
2247 std::lock_guard const sl(mSubLock);
2248
2249 auto& streamMap = mStreamMaps[sConsensusPhase];
2250 if (!streamMap.empty())
2251 {
2253 jvObj[jss::type] = "consensusPhase";
2254 jvObj[jss::consensus] = to_string(phase);
2255
2256 for (auto i = streamMap.begin(); i != streamMap.end();)
2257 {
2258 if (auto p = i->second.lock())
2259 {
2260 p->send(jvObj, true);
2261 ++i;
2262 }
2263 else
2264 {
2265 i = streamMap.erase(i);
2266 }
2267 }
2268 }
2269}
2270
2271void
2273{
2274 // VFALCO consider std::shared_mutex
2275 std::lock_guard const sl(mSubLock);
2276
2277 if (!mStreamMaps[sValidations].empty())
2278 {
2280
2281 auto const signerPublic = val->getSignerPublic();
2282
2283 jvObj[jss::type] = "validationReceived";
2284 jvObj[jss::validation_public_key] = toBase58(TokenType::NodePublic, signerPublic);
2285 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2286 jvObj[jss::signature] = strHex(val->getSignature());
2287 jvObj[jss::full] = val->isFull();
2288 jvObj[jss::flags] = val->getFlags();
2289 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2290 jvObj[jss::data] = strHex(val->getSerializer().slice());
2291 jvObj[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
2292
2293 if (auto version = (*val)[~sfServerVersion])
2294 jvObj[jss::server_version] = std::to_string(*version);
2295
2296 if (auto cookie = (*val)[~sfCookie])
2297 jvObj[jss::cookie] = std::to_string(*cookie);
2298
2299 if (auto hash = (*val)[~sfValidatedHash])
2300 jvObj[jss::validated_hash] = strHex(*hash);
2301
2302 auto const masterKey = registry_.get().getValidatorManifests().getMasterKey(signerPublic);
2303
2304 if (masterKey != signerPublic)
2305 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2306
2307 // NOTE *seq is a number, but old API versions used string. We replace
2308 // number with a string using MultiApiJson near end of this function
2309 if (auto const seq = (*val)[~sfLedgerSequence])
2310 jvObj[jss::ledger_index] = *seq;
2311
2312 if (val->isFieldPresent(sfAmendments))
2313 {
2314 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2315 for (auto const& amendment : val->getFieldV256(sfAmendments))
2316 jvObj[jss::amendments].append(to_string(amendment));
2317 }
2318
2319 if (auto const closeTime = (*val)[~sfCloseTime])
2320 jvObj[jss::close_time] = *closeTime;
2321
2322 if (auto const loadFee = (*val)[~sfLoadFee])
2323 jvObj[jss::load_fee] = *loadFee;
2324
2325 if (auto const baseFee = val->at(~sfBaseFee))
2326 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2327
2328 if (auto const reserveBase = val->at(~sfReserveBase))
2329 jvObj[jss::reserve_base] = *reserveBase;
2330
2331 if (auto const reserveInc = val->at(~sfReserveIncrement))
2332 jvObj[jss::reserve_inc] = *reserveInc;
2333
2334 // (The ~ operator converts the Proxy to a std::optional, which
2335 // simplifies later operations)
2336 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops); baseFeeXRP && baseFeeXRP->native())
2337 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2338
2339 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2340 reserveBaseXRP && reserveBaseXRP->native())
2341 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2342
2343 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2344 reserveIncXRP && reserveIncXRP->native())
2345 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2346
2347 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2348 // for consumers supporting different API versions
2349 MultiApiJson multiObj{jvObj};
2350 multiObj.visit(
2351 RPC::apiVersion<1>, //
2352 [](Json::Value& jvTx) {
2353 // Type conversion for older API versions to string
2354 if (jvTx.isMember(jss::ledger_index))
2355 {
2356 jvTx[jss::ledger_index] = std::to_string(jvTx[jss::ledger_index].asUInt());
2357 }
2358 });
2359
2360 for (auto i = mStreamMaps[sValidations].begin(); i != mStreamMaps[sValidations].end();)
2361 {
2362 if (auto p = i->second.lock())
2363 {
2364 multiObj.visit(
2365 p->getApiVersion(), //
2366 [&](Json::Value const& jv) { p->send(jv, true); });
2367 ++i;
2368 }
2369 else
2370 {
2371 i = mStreamMaps[sValidations].erase(i);
2372 }
2373 }
2374 }
2375}
2376
2377void
2379{
2380 std::lock_guard const sl(mSubLock);
2381
2382 if (!mStreamMaps[sPeerStatus].empty())
2383 {
2384 Json::Value jvObj(func());
2385
2386 jvObj[jss::type] = "peerStatusChange";
2387
2388 for (auto i = mStreamMaps[sPeerStatus].begin(); i != mStreamMaps[sPeerStatus].end();)
2389 {
2390 InfoSub::pointer const p = i->second.lock();
2391
2392 if (p)
2393 {
2394 p->send(jvObj, true);
2395 ++i;
2396 }
2397 else
2398 {
2399 i = mStreamMaps[sPeerStatus].erase(i);
2400 }
2401 }
2402 }
2403}
2404
2405void
2407{
2408 using namespace std::chrono_literals;
2409 if (om == OperatingMode::CONNECTED)
2410 {
2411 if (registry_.get().getLedgerMaster().getValidatedLedgerAge() < 1min)
2413 }
2414 else if (om == OperatingMode::SYNCING)
2415 {
2416 if (registry_.get().getLedgerMaster().getValidatedLedgerAge() >= 1min)
2418 }
2419
2420 if ((om > OperatingMode::CONNECTED) && isBlocked())
2422
2423 if (mMode == om)
2424 return;
2425
2426 mMode = om;
2427
2428 accounting_.mode(om);
2429
2430 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2431 pubServer();
2432}
2433
2434bool
2436{
2437 JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
2438
2440 BypassAccept bypassAccept = BypassAccept::no;
2441 try
2442 {
2443 if (pendingValidations_.contains(val->getLedgerHash()))
2444 {
2445 bypassAccept = BypassAccept::yes;
2446 }
2447 else
2448 {
2449 pendingValidations_.insert(val->getLedgerHash());
2450 }
2451 scope_unlock const unlock(lock);
2452 handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, m_journal);
2453 }
2454 catch (std::exception const& e)
2455 {
2456 JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
2457 << val->getLedgerHash() << ": " << e.what();
2458 }
2459 catch (...)
2460 {
2461 JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
2462 << val->getLedgerHash();
2463 }
2464 if (bypassAccept == BypassAccept::no)
2465 {
2466 pendingValidations_.erase(val->getLedgerHash());
2467 }
2468 lock.unlock();
2469
2470 pubValidation(val);
2471
2472 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2474 ss << "VALIDATION: " << val->render() << " master_key: ";
2475 auto master = registry_.get().getValidators().getTrustedKey(val->getSignerPublic());
2476 if (master)
2477 {
2478 ss << toBase58(TokenType::NodePublic, *master);
2479 }
2480 else
2481 {
2482 ss << "none";
2483 }
2484 return ss.str();
2485 }();
2486
2487 // We will always relay trusted validations; if configured, we will
2488 // also relay all untrusted validations.
2489 return registry_.get().getApp().config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2490}
2491
2494{
2495 return mConsensus.getJson(true);
2496}
2497
2499NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2500{
2502
2503 // System-level warnings
2504 {
2505 Json::Value warnings{Json::arrayValue};
2506 if (isAmendmentBlocked())
2507 {
2508 Json::Value& w = warnings.append(Json::objectValue);
2509 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2510 w[jss::message] =
2511 "This server is amendment blocked, and must be updated to be "
2512 "able to stay in sync with the network.";
2513 }
2514 if (isUNLBlocked())
2515 {
2516 Json::Value& w = warnings.append(Json::objectValue);
2517 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2518 w[jss::message] =
2519 "This server has an expired validator list. validators.txt "
2520 "may be incorrectly configured or some [validator_list_sites] "
2521 "may be unreachable.";
2522 }
2523 if (admin && isAmendmentWarned())
2524 {
2525 Json::Value& w = warnings.append(Json::objectValue);
2526 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2527 w[jss::message] =
2528 "One or more unsupported amendments have reached majority. "
2529 "Upgrade to the latest version before they are activated "
2530 "to avoid being amendment blocked.";
2531 if (auto const expected =
2532 registry_.get().getAmendmentTable().firstUnsupportedExpected())
2533 {
2534 auto& d = w[jss::details] = Json::objectValue;
2535 d[jss::expected_date] = expected->time_since_epoch().count();
2536 d[jss::expected_date_UTC] = to_string(*expected);
2537 }
2538 }
2539
2540 if (warnings.size() != 0u)
2541 info[jss::warnings] = std::move(warnings);
2542 }
2543
2544 // hostid: unique string describing the machine
2545 if (human)
2546 info[jss::hostid] = getHostId(admin);
2547
2548 // domain: if configured with a domain, report it:
2549 if (!registry_.get().getApp().config().SERVER_DOMAIN.empty())
2550 info[jss::server_domain] = registry_.get().getApp().config().SERVER_DOMAIN;
2551
2552 info[jss::build_version] = BuildInfo::getVersionString();
2553
2554 info[jss::server_state] = strOperatingMode(admin);
2555
2556 info[jss::time] =
2557 to_string(std::chrono::floor<std::chrono::microseconds>(std::chrono::system_clock::now()));
2558
2560 info[jss::network_ledger] = "waiting";
2561
2562 info[jss::validation_quorum] =
2563 static_cast<Json::UInt>(registry_.get().getValidators().quorum());
2564
2565 if (admin)
2566 {
2567 // Note: By default the node size is "tiny". When parsing it's an error if the final
2568 // NODE_SIZE is over 4 so below code should be safe.
2569 // NOLINTNEXTLINE(bugprone-switch-missing-default-case)
2570 switch (registry_.get().getApp().config().NODE_SIZE)
2571 {
2572 case 0:
2573 info[jss::node_size] = "tiny";
2574 break;
2575 case 1:
2576 info[jss::node_size] = "small";
2577 break;
2578 case 2:
2579 info[jss::node_size] = "medium";
2580 break;
2581 case 3:
2582 info[jss::node_size] = "large";
2583 break;
2584 case 4:
2585 info[jss::node_size] = "huge";
2586 break;
2587 }
2588
2589 auto when = registry_.get().getValidators().expires();
2590
2591 if (!human)
2592 {
2593 if (when)
2594 {
2595 info[jss::validator_list_expires] =
2596 safe_cast<Json::UInt>(when->time_since_epoch().count());
2597 }
2598 else
2599 {
2600 info[jss::validator_list_expires] = 0;
2601 }
2602 }
2603 else
2604 {
2605 auto& x = (info[jss::validator_list] = Json::objectValue);
2606
2607 x[jss::count] = static_cast<Json::UInt>(registry_.get().getValidators().count());
2608
2609 if (when)
2610 {
2611 if (*when == TimeKeeper::time_point::max())
2612 {
2613 x[jss::expiration] = "never";
2614 x[jss::status] = "active";
2615 }
2616 else
2617 {
2618 x[jss::expiration] = to_string(*when);
2619
2620 if (*when > registry_.get().getTimeKeeper().now())
2621 {
2622 x[jss::status] = "active";
2623 }
2624 else
2625 {
2626 x[jss::status] = "expired";
2627 }
2628 }
2629 }
2630 else
2631 {
2632 x[jss::status] = "unknown";
2633 x[jss::expiration] = "unknown";
2634 }
2635 }
2636
2637 if (!xrpl::git::getCommitHash().empty() || !xrpl::git::getBuildBranch().empty())
2638 {
2639 auto& x = (info[jss::git] = Json::objectValue);
2641 x[jss::hash] = xrpl::git::getCommitHash();
2643 x[jss::branch] = xrpl::git::getBuildBranch();
2644 }
2645 }
2646 info[jss::io_latency_ms] =
2647 static_cast<Json::UInt>(registry_.get().getApp().getIOLatency().count());
2648
2649 if (admin)
2650 {
2651 if (auto const localPubKey = registry_.get().getValidators().localPublicKey();
2652 localPubKey && registry_.get().getApp().getValidationPublicKey())
2653 {
2654 info[jss::pubkey_validator] = toBase58(TokenType::NodePublic, localPubKey.value());
2655 }
2656 else
2657 {
2658 info[jss::pubkey_validator] = "none";
2659 }
2660 }
2661
2662 if (counters)
2663 {
2664 info[jss::counters] = registry_.get().getPerfLog().countersJson();
2665
2666 Json::Value nodestore(Json::objectValue);
2667 registry_.get().getNodeStore().getCountsJson(nodestore);
2668 info[jss::counters][jss::nodestore] = nodestore;
2669 info[jss::current_activities] = registry_.get().getPerfLog().currentJson();
2670 }
2671
2672 info[jss::pubkey_node] =
2673 toBase58(TokenType::NodePublic, registry_.get().getApp().nodeIdentity().first);
2674
2675 info[jss::complete_ledgers] = registry_.get().getLedgerMaster().getCompleteLedgers();
2676
2678 info[jss::amendment_blocked] = true;
2679
2680 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2681
2682 if (fp != 0)
2683 info[jss::fetch_pack] = Json::UInt(fp);
2684
2685 info[jss::peers] = Json::UInt(registry_.get().getOverlay().size());
2686
2687 Json::Value lastClose = Json::objectValue;
2688 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2689
2690 if (human)
2691 {
2692 lastClose[jss::converge_time_s] =
2694 }
2695 else
2696 {
2697 lastClose[jss::converge_time] = Json::Int(mConsensus.prevRoundTime().count());
2698 }
2699
2700 info[jss::last_close] = lastClose;
2701
2702 // info[jss::consensus] = mConsensus.getJson();
2703
2704 if (admin)
2705 info[jss::load] = m_job_queue.getJson();
2706
2707 if (auto const netid = registry_.get().getOverlay().networkID())
2708 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2709
2710 auto const escalationMetrics =
2711 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current());
2712
2713 auto const loadFactorServer = registry_.get().getFeeTrack().getLoadFactor();
2714 auto const loadBaseServer = registry_.get().getFeeTrack().getLoadBase();
2715 /* Scale the escalated fee level to unitless "load factor".
2716 In practice, this just strips the units, but it will continue
2717 to work correctly if either base value ever changes. */
2718 auto const loadFactorFeeEscalation = mulDiv(
2719 escalationMetrics.openLedgerFeeLevel,
2720 loadBaseServer,
2721 escalationMetrics.referenceFeeLevel)
2723
2724 auto const loadFactor =
2725 std::max(safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2726
2727 if (!human)
2728 {
2729 info[jss::load_base] = loadBaseServer;
2730 info[jss::load_factor] = trunc32(loadFactor);
2731 info[jss::load_factor_server] = loadFactorServer;
2732
2733 /* Json::Value doesn't support uint64, so clamp to max
2734 uint32 value. This is mostly theoretical, since there
2735 probably isn't enough extant XRP to drive the factor
2736 that high.
2737 */
2738 info[jss::load_factor_fee_escalation] = escalationMetrics.openLedgerFeeLevel.jsonClipped();
2739 info[jss::load_factor_fee_queue] = escalationMetrics.minProcessingFeeLevel.jsonClipped();
2740 info[jss::load_factor_fee_reference] = escalationMetrics.referenceFeeLevel.jsonClipped();
2741 }
2742 else
2743 {
2744 info[jss::load_factor] = static_cast<double>(loadFactor) / loadBaseServer;
2745
2746 if (loadFactorServer != loadFactor)
2747 info[jss::load_factor_server] = static_cast<double>(loadFactorServer) / loadBaseServer;
2748
2749 if (admin)
2750 {
2751 std::uint32_t fee = registry_.get().getFeeTrack().getLocalFee();
2752 if (fee != loadBaseServer)
2753 info[jss::load_factor_local] = static_cast<double>(fee) / loadBaseServer;
2754 fee = registry_.get().getFeeTrack().getRemoteFee();
2755 if (fee != loadBaseServer)
2756 info[jss::load_factor_net] = static_cast<double>(fee) / loadBaseServer;
2757 fee = registry_.get().getFeeTrack().getClusterFee();
2758 if (fee != loadBaseServer)
2759 info[jss::load_factor_cluster] = static_cast<double>(fee) / loadBaseServer;
2760 }
2761 if (escalationMetrics.openLedgerFeeLevel != escalationMetrics.referenceFeeLevel &&
2762 (admin || loadFactorFeeEscalation != loadFactor))
2763 {
2764 info[jss::load_factor_fee_escalation] =
2765 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2766 escalationMetrics.referenceFeeLevel);
2767 }
2768 if (escalationMetrics.minProcessingFeeLevel != escalationMetrics.referenceFeeLevel)
2769 {
2770 info[jss::load_factor_fee_queue] =
2771 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2772 escalationMetrics.referenceFeeLevel);
2773 }
2774 }
2775
2776 bool valid = false;
2777 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2778
2779 if (lpClosed)
2780 {
2781 valid = true;
2782 }
2783 else
2784 {
2785 lpClosed = m_ledgerMaster.getClosedLedger();
2786 }
2787
2788 if (lpClosed)
2789 {
2790 XRPAmount const baseFee = lpClosed->fees().base;
2792 l[jss::seq] = Json::UInt(lpClosed->header().seq);
2793 l[jss::hash] = to_string(lpClosed->header().hash);
2794
2795 if (!human)
2796 {
2797 l[jss::base_fee] = baseFee.jsonClipped();
2798 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2799 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2800 l[jss::close_time] =
2801 Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
2802 }
2803 else
2804 {
2805 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2806 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2807 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2808
2809 if (auto const closeOffset = registry_.get().getTimeKeeper().closeOffset();
2810 std::abs(closeOffset.count()) >= 60)
2811 l[jss::close_time_offset] = static_cast<std::uint32_t>(closeOffset.count());
2812
2813 constexpr std::chrono::seconds highAgeThreshold{1000000};
2815 {
2816 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2817 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2818 }
2819 else
2820 {
2821 auto lCloseTime = lpClosed->header().closeTime;
2822 auto closeTime = registry_.get().getTimeKeeper().closeTime();
2823 if (lCloseTime <= closeTime)
2824 {
2825 using namespace std::chrono_literals;
2826 auto age = closeTime - lCloseTime;
2827 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2828 }
2829 }
2830 }
2831
2832 if (valid)
2833 {
2834 info[jss::validated_ledger] = l;
2835 }
2836 else
2837 {
2838 info[jss::closed_ledger] = l;
2839 }
2840
2841 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2842 if (!lpPublished)
2843 {
2844 info[jss::published_ledger] = "none";
2845 }
2846 else if (lpPublished->header().seq != lpClosed->header().seq)
2847 {
2848 info[jss::published_ledger] = lpPublished->header().seq;
2849 }
2850 }
2851
2852 accounting_.json(info);
2853 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2854 info[jss::jq_trans_overflow] =
2855 std::to_string(registry_.get().getOverlay().getJqTransOverflow());
2856 info[jss::peer_disconnects] = std::to_string(registry_.get().getOverlay().getPeerDisconnect());
2857 info[jss::peer_disconnects_resources] =
2858 std::to_string(registry_.get().getOverlay().getPeerDisconnectCharges());
2859
2860 // This array must be sorted in increasing order.
2861 static constexpr std::array<std::string_view, 7> protocols{
2862 "http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2863 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
2864 {
2866 for (auto const& port : registry_.get().getServerHandler().setup().ports)
2867 {
2868 // Don't publish admin ports for non-admin users
2869 if (!admin &&
2870 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
2871 port.admin_user.empty() && port.admin_password.empty()))
2872 continue;
2875 std::begin(port.protocol),
2876 std::end(port.protocol),
2877 std::begin(protocols),
2878 std::end(protocols),
2879 std::back_inserter(proto));
2880 if (!proto.empty())
2881 {
2882 auto& jv = ports.append(Json::Value(Json::objectValue));
2883 jv[jss::port] = std::to_string(port.port);
2884 jv[jss::protocol] = Json::Value{Json::arrayValue};
2885 for (auto const& p : proto)
2886 jv[jss::protocol].append(p);
2887 }
2888 }
2889
2890 if (registry_.get().getApp().config().exists(SECTION_PORT_GRPC))
2891 {
2892 auto const& grpcSection = registry_.get().getApp().config().section(SECTION_PORT_GRPC);
2893 auto const optPort = grpcSection.get("port");
2894 if (optPort && grpcSection.get("ip"))
2895 {
2896 auto& jv = ports.append(Json::Value(Json::objectValue));
2897 jv[jss::port] = *optPort;
2898 jv[jss::protocol] = Json::Value{Json::arrayValue};
2899 jv[jss::protocol].append("grpc");
2900 }
2901 }
2902 info[jss::ports] = std::move(ports);
2903 }
2904
2905 return info;
2906}
2907
2908void
2910{
2911 registry_.get().getInboundLedgers().clearFailures();
2912}
2913
2916{
2917 return registry_.get().getInboundLedgers().getInfo();
2918}
2919
2920void
2922 std::shared_ptr<ReadView const> const& ledger,
2923 std::shared_ptr<STTx const> const& transaction,
2924 TER result)
2925{
2926 // never publish an inner txn inside a batch txn. The flag should
2927 // only be set if the Batch feature is enabled. If Batch is not
2928 // enabled, the flag is always invalid, so don't publish it
2929 // regardless.
2930 if (transaction->isFlag(tfInnerBatchTxn))
2931 return;
2932
2933 MultiApiJson jvObj = transJson(transaction, result, false, ledger, std::nullopt);
2934
2935 {
2936 std::lock_guard const sl(mSubLock);
2937
2938 auto it = mStreamMaps[sRTTransactions].begin();
2939 while (it != mStreamMaps[sRTTransactions].end())
2940 {
2941 InfoSub::pointer p = it->second.lock();
2942
2943 if (p)
2944 {
2945 jvObj.visit(
2946 p->getApiVersion(), //
2947 [&](Json::Value const& jv) { p->send(jv, true); });
2948 ++it;
2949 }
2950 else
2951 {
2952 it = mStreamMaps[sRTTransactions].erase(it);
2953 }
2954 }
2955 }
2956
2957 pubProposedAccountTransaction(ledger, transaction, result);
2958}
2959
2960void
2962{
2963 // Ledgers are published only when they acquire sufficient validations
2964 // Holes are filled across connection loss or other catastrophe
2965
2967 registry_.get().getAcceptedLedgerCache().fetch(lpAccepted->header().hash);
2968 if (!alpAccepted)
2969 {
2970 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted);
2971 registry_.get().getAcceptedLedgerCache().canonicalize_replace_client(
2972 lpAccepted->header().hash, alpAccepted);
2973 }
2974
2975 XRPL_ASSERT(
2976 alpAccepted->getLedger().get() == lpAccepted.get(),
2977 "xrpl::NetworkOPsImp::pubLedger : accepted input");
2978
2979 {
2980 JLOG(m_journal.debug()) << "Publishing ledger " << lpAccepted->header().seq << " "
2981 << lpAccepted->header().hash;
2982
2983 std::lock_guard const sl(mSubLock);
2984
2985 if (!mStreamMaps[sLedger].empty())
2986 {
2988
2989 jvObj[jss::type] = "ledgerClosed";
2990 jvObj[jss::ledger_index] = lpAccepted->header().seq;
2991 jvObj[jss::ledger_hash] = to_string(lpAccepted->header().hash);
2992 jvObj[jss::ledger_time] =
2993 Json::Value::UInt(lpAccepted->header().closeTime.time_since_epoch().count());
2994
2995 jvObj[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
2996
2997 if (!lpAccepted->rules().enabled(featureXRPFees))
2998 jvObj[jss::fee_ref] = FEE_UNITS_DEPRECATED;
2999 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3000 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3001 jvObj[jss::reserve_inc] = lpAccepted->fees().increment.jsonClipped();
3002
3003 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
3004
3006 {
3007 jvObj[jss::validated_ledgers] =
3008 registry_.get().getLedgerMaster().getCompleteLedgers();
3009 }
3010
3011 auto it = mStreamMaps[sLedger].begin();
3012 while (it != mStreamMaps[sLedger].end())
3013 {
3014 InfoSub::pointer const p = it->second.lock();
3015 if (p)
3016 {
3017 p->send(jvObj, true);
3018 ++it;
3019 }
3020 else
3021 {
3022 it = mStreamMaps[sLedger].erase(it);
3023 }
3024 }
3025 }
3026
3027 if (!mStreamMaps[sBookChanges].empty())
3028 {
3029 Json::Value const jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
3030
3031 auto it = mStreamMaps[sBookChanges].begin();
3032 while (it != mStreamMaps[sBookChanges].end())
3033 {
3034 InfoSub::pointer const p = it->second.lock();
3035 if (p)
3036 {
3037 p->send(jvObj, true);
3038 ++it;
3039 }
3040 else
3041 {
3042 it = mStreamMaps[sBookChanges].erase(it);
3043 }
3044 }
3045 }
3046
3047 {
3048 static bool firstTime = true;
3049 if (firstTime)
3050 {
3051 // First validated ledger, start delayed SubAccountHistory
3052 firstTime = false;
3053 for (auto& outer : mSubAccountHistory)
3054 {
3055 for (auto& inner : outer.second)
3056 {
3057 auto& subInfo = inner.second;
3058 if (subInfo.index_->separationLedgerSeq_ == 0)
3059 {
3060 subAccountHistoryStart(alpAccepted->getLedger(), subInfo);
3061 }
3062 }
3063 }
3064 }
3065 }
3066 }
3067
3068 // Don't lock since pubAcceptedTransaction is locking.
3069 for (auto const& accTx : *alpAccepted)
3070 {
3071 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
3072 pubValidatedTransaction(lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3073 }
3074}
3075
3076void
3078{
3079 ServerFeeSummary const f{
3080 registry_.get().getOpenLedger().current()->fees().base,
3081 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current()),
3082 registry_.get().getFeeTrack()};
3083
3084 // only schedule the job if something has changed
3085 if (f != mLastFeeSummary)
3086 {
3087 m_job_queue.addJob(jtCLIENT_FEE_CHANGE, "PubFee", [this]() { pubServer(); });
3088 }
3089}
3090
3091void
3093{
3094 m_job_queue.addJob(jtCLIENT_CONSENSUS, "PubCons", [this, phase]() { pubConsensus(phase); });
3095}
3096
3097inline void
3099{
3100 m_localTX->sweep(view);
3101}
3102inline std::size_t
3104{
3105 return m_localTX->size();
3106}
3107
3108// This routine should only be used to publish accepted or validated
3109// transactions.
3112 std::shared_ptr<STTx const> const& transaction,
3113 TER result,
3114 bool validated,
3115 std::shared_ptr<ReadView const> const& ledger,
3117{
3119 std::string sToken;
3120 std::string sHuman;
3121
3122 transResultInfo(result, sToken, sHuman);
3123
3124 jvObj[jss::type] = "transaction";
3125 // NOTE jvObj is not a finished object for either API version. After
3126 // it's populated, we need to finish it for a specific API version. This is
3127 // done in a loop, near the end of this function.
3128 jvObj[jss::transaction] = transaction->getJson(JsonOptions::disable_API_prior_V2, false);
3129
3130 if (meta)
3131 {
3132 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
3133 RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, transaction, meta->get());
3134 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
3135 RPC::insertMPTokenIssuanceID(jvObj[jss::meta], transaction, meta->get());
3136 }
3137
3138 // add CTID where the needed data for it exists
3139 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
3140 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3141 {
3142 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3143 uint32_t netID = registry_.get().getNetworkIDService().getNetworkID();
3144 if (transaction->isFieldPresent(sfNetworkID))
3145 netID = transaction->getFieldU32(sfNetworkID);
3146
3147 if (std::optional<std::string> ctid = RPC::encodeCTID(ledger->header().seq, txnSeq, netID);
3148 ctid)
3149 jvObj[jss::ctid] = *ctid;
3150 }
3151 if (!ledger->open())
3152 jvObj[jss::ledger_hash] = to_string(ledger->header().hash);
3153
3154 if (validated)
3155 {
3156 jvObj[jss::ledger_index] = ledger->header().seq;
3157 jvObj[jss::transaction][jss::date] = ledger->header().closeTime.time_since_epoch().count();
3158 jvObj[jss::validated] = true;
3159 jvObj[jss::close_time_iso] = to_string_iso(ledger->header().closeTime);
3160
3161 // WRITEME: Put the account next seq here
3162 }
3163 else
3164 {
3165 jvObj[jss::validated] = false;
3166 jvObj[jss::ledger_current_index] = ledger->header().seq;
3167 }
3168
3169 jvObj[jss::status] = validated ? "closed" : "proposed";
3170 jvObj[jss::engine_result] = sToken;
3171 jvObj[jss::engine_result_code] = result;
3172 jvObj[jss::engine_result_message] = sHuman;
3173
3174 if (transaction->getTxnType() == ttOFFER_CREATE)
3175 {
3176 auto const account = transaction->getAccountID(sfAccount);
3177 auto const amount = transaction->getFieldAmount(sfTakerGets);
3178
3179 // If the offer create is not self funded then add the owner balance
3180 if (account != amount.issue().account)
3181 {
3182 auto const ownerFunds = accountFunds(
3183 *ledger, account, amount, fhIGNORE_FREEZE, registry_.get().getJournal("View"));
3184 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3185 }
3186 }
3187
3188 std::string const hash = to_string(transaction->getTransactionID());
3189 MultiApiJson multiObj{jvObj};
3191 multiObj.visit(), //
3192 [&]<unsigned Version>(Json::Value& jvTx, std::integral_constant<unsigned, Version>) {
3193 RPC::insertDeliverMax(jvTx[jss::transaction], transaction->getTxnType(), Version);
3194
3195 if constexpr (Version > 1)
3196 {
3197 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3198 jvTx[jss::hash] = hash;
3199 }
3200 else
3201 {
3202 jvTx[jss::transaction][jss::hash] = hash;
3203 }
3204 });
3205
3206 return multiObj;
3207}
3208
3209void
3211 std::shared_ptr<ReadView const> const& ledger,
3212 AcceptedLedgerTx const& transaction,
3213 bool last)
3214{
3215 auto const& stTxn = transaction.getTxn();
3216
3217 // Create two different Json objects, for different API versions
3218 auto const metaRef = std::ref(transaction.getMeta());
3219 auto const trResult = transaction.getResult();
3220 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3221
3222 {
3223 std::lock_guard const sl(mSubLock);
3224
3225 auto it = mStreamMaps[sTransactions].begin();
3226 while (it != mStreamMaps[sTransactions].end())
3227 {
3228 InfoSub::pointer p = it->second.lock();
3229
3230 if (p)
3231 {
3232 jvObj.visit(
3233 p->getApiVersion(), //
3234 [&](Json::Value const& jv) { p->send(jv, true); });
3235 ++it;
3236 }
3237 else
3238 {
3239 it = mStreamMaps[sTransactions].erase(it);
3240 }
3241 }
3242
3243 it = mStreamMaps[sRTTransactions].begin();
3244
3245 while (it != mStreamMaps[sRTTransactions].end())
3246 {
3247 InfoSub::pointer p = it->second.lock();
3248
3249 if (p)
3250 {
3251 jvObj.visit(
3252 p->getApiVersion(), //
3253 [&](Json::Value const& jv) { p->send(jv, true); });
3254 ++it;
3255 }
3256 else
3257 {
3258 it = mStreamMaps[sRTTransactions].erase(it);
3259 }
3260 }
3261 }
3262
3263 if (transaction.getResult() == tesSUCCESS)
3264 registry_.get().getOrderBookDB().processTxn(ledger, transaction, jvObj);
3265
3266 pubAccountTransaction(ledger, transaction, last);
3267}
3268
3269void
3271 std::shared_ptr<ReadView const> const& ledger,
3272 AcceptedLedgerTx const& transaction,
3273 bool last)
3274{
3276 int iProposed = 0;
3277 int iAccepted = 0;
3278
3279 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3280 auto const currLedgerSeq = ledger->seq();
3281 {
3282 std::lock_guard const sl(mSubLock);
3283
3285 {
3286 for (auto const& affectedAccount : transaction.getAffected())
3287 {
3288 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3289 simiIt != mSubRTAccount.end())
3290 {
3291 auto it = simiIt->second.begin();
3292
3293 while (it != simiIt->second.end())
3294 {
3295 InfoSub::pointer const p = it->second.lock();
3296
3297 if (p)
3298 {
3299 notify.insert(p);
3300 ++it;
3301 ++iProposed;
3302 }
3303 else
3304 {
3305 it = simiIt->second.erase(it);
3306 }
3307 }
3308 }
3309
3310 if (auto simiIt = mSubAccount.find(affectedAccount); simiIt != mSubAccount.end())
3311 {
3312 auto it = simiIt->second.begin();
3313 while (it != simiIt->second.end())
3314 {
3315 InfoSub::pointer const p = it->second.lock();
3316
3317 if (p)
3318 {
3319 notify.insert(p);
3320 ++it;
3321 ++iAccepted;
3322 }
3323 else
3324 {
3325 it = simiIt->second.erase(it);
3326 }
3327 }
3328 }
3329
3330 if (auto historyIt = mSubAccountHistory.find(affectedAccount);
3331 historyIt != mSubAccountHistory.end())
3332 {
3333 auto& subs = historyIt->second;
3334 auto it = subs.begin();
3335 while (it != subs.end())
3336 {
3337 SubAccountHistoryInfoWeak const& info = it->second;
3338 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3339 {
3340 ++it;
3341 continue;
3342 }
3343
3344 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3345 {
3346 accountHistoryNotify.emplace_back(
3347 SubAccountHistoryInfo{isSptr, info.index_});
3348 ++it;
3349 }
3350 else
3351 {
3352 it = subs.erase(it);
3353 }
3354 }
3355 if (subs.empty())
3356 mSubAccountHistory.erase(historyIt);
3357 }
3358 }
3359 }
3360 }
3361
3362 JLOG(m_journal.trace()) << "pubAccountTransaction: "
3363 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3364
3365 if (!notify.empty() || !accountHistoryNotify.empty())
3366 {
3367 auto const& stTxn = transaction.getTxn();
3368
3369 // Create two different Json objects, for different API versions
3370 auto const metaRef = std::ref(transaction.getMeta());
3371 auto const trResult = transaction.getResult();
3372 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3373
3374 for (InfoSub::ref isrListener : notify)
3375 {
3376 jvObj.visit(
3377 isrListener->getApiVersion(), //
3378 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3379 }
3380
3381 if (last)
3382 jvObj.set(jss::account_history_boundary, true);
3383
3384 XRPL_ASSERT(
3385 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3386 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3387 "account_history_tx_stream not set");
3388 for (auto& info : accountHistoryNotify)
3389 {
3390 auto& index = info.index_;
3391 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3392 jvObj.set(jss::account_history_tx_first, true);
3393
3394 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3395
3396 jvObj.visit(
3397 info.sink_->getApiVersion(), //
3398 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3399 }
3400 }
3401}
3402
3403void
3405 std::shared_ptr<ReadView const> const& ledger,
3407 TER result)
3408{
3410 int iProposed = 0;
3411
3412 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3413
3414 {
3415 std::lock_guard const sl(mSubLock);
3416
3417 if (mSubRTAccount.empty())
3418 return;
3419
3421 {
3422 for (auto const& affectedAccount : tx->getMentionedAccounts())
3423 {
3424 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3425 simiIt != mSubRTAccount.end())
3426 {
3427 auto it = simiIt->second.begin();
3428
3429 while (it != simiIt->second.end())
3430 {
3431 InfoSub::pointer const p = it->second.lock();
3432
3433 if (p)
3434 {
3435 notify.insert(p);
3436 ++it;
3437 ++iProposed;
3438 }
3439 else
3440 {
3441 it = simiIt->second.erase(it);
3442 }
3443 }
3444 }
3445 }
3446 }
3447 }
3448
3449 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3450
3451 if (!notify.empty() || !accountHistoryNotify.empty())
3452 {
3453 // Create two different Json objects, for different API versions
3454 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3455
3456 for (InfoSub::ref isrListener : notify)
3457 {
3458 jvObj.visit(
3459 isrListener->getApiVersion(), //
3460 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3461 }
3462
3463 XRPL_ASSERT(
3464 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3465 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3466 "account_history_tx_stream not set");
3467 for (auto& info : accountHistoryNotify)
3468 {
3469 auto& index = info.index_;
3470 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3471 jvObj.set(jss::account_history_tx_first, true);
3472 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3473 jvObj.visit(
3474 info.sink_->getApiVersion(), //
3475 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3476 }
3477 }
3478}
3479
3480//
3481// Monitoring
3482//
3483
3484void
3486 InfoSub::ref isrListener,
3487 hash_set<AccountID> const& vnaAccountIDs,
3488 bool rt)
3489{
3490 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3491
3492 for (auto const& naAccountID : vnaAccountIDs)
3493 {
3494 JLOG(m_journal.trace()) << "subAccount: account: " << toBase58(naAccountID);
3495
3496 isrListener->insertSubAccountInfo(naAccountID, rt);
3497 }
3498
3499 std::lock_guard const sl(mSubLock);
3500
3501 for (auto const& naAccountID : vnaAccountIDs)
3502 {
3503 auto simIterator = subMap.find(naAccountID);
3504 if (simIterator == subMap.end())
3505 {
3506 // Not found, note that account has a new single listener.
3507 SubMapType usisElement;
3508 usisElement[isrListener->getSeq()] = isrListener;
3509 // VFALCO NOTE This is making a needless copy of naAccountID
3510 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3511 }
3512 else
3513 {
3514 // Found, note that the account has another listener.
3515 simIterator->second[isrListener->getSeq()] = isrListener;
3516 }
3517 }
3518}
3519
3520void
3522 InfoSub::ref isrListener,
3523 hash_set<AccountID> const& vnaAccountIDs,
3524 bool rt)
3525{
3526 for (auto const& naAccountID : vnaAccountIDs)
3527 {
3528 // Remove from the InfoSub
3529 isrListener->deleteSubAccountInfo(naAccountID, rt);
3530 }
3531
3532 // Remove from the server
3533 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3534}
3535
3536void
3538 std::uint64_t uSeq,
3539 hash_set<AccountID> const& vnaAccountIDs,
3540 bool rt)
3541{
3542 std::lock_guard const sl(mSubLock);
3543
3544 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3545
3546 for (auto const& naAccountID : vnaAccountIDs)
3547 {
3548 auto simIterator = subMap.find(naAccountID);
3549
3550 if (simIterator != subMap.end())
3551 {
3552 // Found
3553 simIterator->second.erase(uSeq);
3554
3555 if (simIterator->second.empty())
3556 {
3557 // Don't need hash entry.
3558 subMap.erase(simIterator);
3559 }
3560 }
3561 }
3562}
3563
3564void
3566{
3567 enum DatabaseType { Sqlite, None };
3568 static auto const databaseType = [&]() -> DatabaseType {
3569 // Use a dynamic_cast to return DatabaseType::None
3570 // on failure.
3571 if (dynamic_cast<SQLiteDatabase*>(&registry_.get().getRelationalDatabase()))
3572 {
3573 return DatabaseType::Sqlite;
3574 }
3575 return DatabaseType::None;
3576 }();
3577
3578 if (databaseType == DatabaseType::None)
3579 {
3580 // LCOV_EXCL_START
3581 UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
3582 JLOG(m_journal.error()) << "AccountHistory job for account "
3583 << toBase58(subInfo.index_->accountId_) << " no database";
3584 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3585 {
3586 sptr->send(rpcError(rpcINTERNAL), true);
3587 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3588 }
3589 return;
3590 // LCOV_EXCL_STOP
3591 }
3592
3593 registry_.get().getJobQueue().addJob(
3594 jtCLIENT_ACCT_HIST, "HistTxStream", [this, dbType = databaseType, subInfo]() {
3595 auto const& accountId = subInfo.index_->accountId_;
3596 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3597 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3598
3599 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3600 << " started. lastLedgerSeq=" << lastLedgerSeq;
3601
3602 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
3603 std::shared_ptr<TxMeta> const& meta) -> bool {
3604 /*
3605 * genesis account: first tx is the one with seq 1
3606 * other account: first tx is the one created the account
3607 */
3608 if (accountId == genesisAccountId)
3609 {
3610 auto stx = tx->getSTransaction();
3611 if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
3612 return true;
3613 }
3614
3615 for (auto& node : meta->getNodes())
3616 {
3617 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3618 continue;
3619
3620 if (node.isFieldPresent(sfNewFields))
3621 {
3622 if (auto inner =
3623 dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
3624 inner)
3625 {
3626 if (inner->isFieldPresent(sfAccount) &&
3627 inner->getAccountID(sfAccount) == accountId)
3628 {
3629 return true;
3630 }
3631 }
3632 }
3633 }
3634
3635 return false;
3636 };
3637
3638 auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
3639 if (auto sptr = subInfo.sinkWptr_.lock())
3640 {
3641 sptr->send(jvObj, true);
3642 if (unsubscribe)
3643 unsubAccountHistory(sptr, accountId, false);
3644 return true;
3645 }
3646
3647 return false;
3648 };
3649
3650 auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
3651 if (auto sptr = subInfo.sinkWptr_.lock())
3652 {
3653 jvObj.visit(
3654 sptr->getApiVersion(), //
3655 [&](Json::Value const& jv) { sptr->send(jv, true); });
3656
3657 if (unsubscribe)
3658 unsubAccountHistory(sptr, accountId, false);
3659 return true;
3660 }
3661
3662 return false;
3663 };
3664
3665 auto getMoreTxns = [&](std::uint32_t minLedger,
3666 std::uint32_t maxLedger,
3671 switch (dbType)
3672 {
3673 case Sqlite: {
3674 auto& db = registry_.get().getRelationalDatabase();
3676 accountId, {minLedger, maxLedger}, marker, 0, true};
3677 return db.newestAccountTxPage(options);
3678 }
3679 // LCOV_EXCL_START
3680 default: {
3681 UNREACHABLE(
3682 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3683 "getMoreTxns : invalid database type");
3684 return {};
3685 }
3686 // LCOV_EXCL_STOP
3687 }
3688 };
3689
3690 /*
3691 * search backward until the genesis ledger or asked to stop
3692 */
3693 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3694 {
3695 int feeChargeCount = 0;
3696 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3697 {
3698 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3699 ++feeChargeCount;
3700 }
3701 else
3702 {
3703 JLOG(m_journal.trace())
3704 << "AccountHistory job for account " << toBase58(accountId)
3705 << " no InfoSub. Fee charged " << feeChargeCount << " times.";
3706 return;
3707 }
3708
3709 // try to search in 1024 ledgers till reaching genesis ledgers
3710 auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3711 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3712 << ", working on ledger range [" << startLedgerSeq << ","
3713 << lastLedgerSeq << "]";
3714
3715 auto haveRange = [&]() -> bool {
3716 std::uint32_t validatedMin = UINT_MAX;
3717 std::uint32_t validatedMax = 0;
3718 auto haveSomeValidatedLedgers =
3719 registry_.get().getLedgerMaster().getValidatedRange(
3720 validatedMin, validatedMax);
3721
3722 return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
3723 lastLedgerSeq <= validatedMax;
3724 }();
3725
3726 if (!haveRange)
3727 {
3728 JLOG(m_journal.debug()) << "AccountHistory reschedule job for account "
3729 << toBase58(accountId) << ", incomplete ledger range ["
3730 << startLedgerSeq << "," << lastLedgerSeq << "]";
3732 return;
3733 }
3734
3736 while (!subInfo.index_->stopHistorical_)
3737 {
3738 auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3739 if (!dbResult)
3740 {
3741 // LCOV_EXCL_START
3742 UNREACHABLE(
3743 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3744 "getMoreTxns failed");
3745 JLOG(m_journal.debug()) << "AccountHistory job for account "
3746 << toBase58(accountId) << " getMoreTxns failed.";
3747 send(rpcError(rpcINTERNAL), true);
3748 return;
3749 // LCOV_EXCL_STOP
3750 }
3751
3752 auto const& txns = dbResult->first;
3753 marker = dbResult->second;
3754 size_t const num_txns = txns.size();
3755 for (size_t i = 0; i < num_txns; ++i)
3756 {
3757 auto const& [tx, meta] = txns[i];
3758
3759 if (!tx || !meta)
3760 {
3761 JLOG(m_journal.debug()) << "AccountHistory job for account "
3762 << toBase58(accountId) << " empty tx or meta.";
3763 send(rpcError(rpcINTERNAL), true);
3764 return;
3765 }
3766 auto curTxLedger =
3767 registry_.get().getLedgerMaster().getLedgerBySeq(tx->getLedger());
3768 if (!curTxLedger)
3769 {
3770 // LCOV_EXCL_START
3771 UNREACHABLE(
3772 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3773 "getLedgerBySeq failed");
3774 JLOG(m_journal.debug()) << "AccountHistory job for account "
3775 << toBase58(accountId) << " no ledger.";
3776 send(rpcError(rpcINTERNAL), true);
3777 return;
3778 // LCOV_EXCL_STOP
3779 }
3780 std::shared_ptr<STTx const> const stTxn = tx->getSTransaction();
3781 if (!stTxn)
3782 {
3783 // LCOV_EXCL_START
3784 UNREACHABLE(
3785 "NetworkOPsImp::addAccountHistoryJob : "
3786 "getSTransaction failed");
3787 JLOG(m_journal.debug())
3788 << "AccountHistory job for account " << toBase58(accountId)
3789 << " getSTransaction failed.";
3790 send(rpcError(rpcINTERNAL), true);
3791 return;
3792 // LCOV_EXCL_STOP
3793 }
3794
3795 auto const mRef = std::ref(*meta);
3796 auto const trR = meta->getResultTER();
3797 MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
3798
3799 jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
3800 if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
3801 jvTx.set(jss::account_history_boundary, true);
3802
3803 if (isFirstTx(tx, meta))
3804 {
3805 jvTx.set(jss::account_history_tx_first, true);
3806 sendMultiApiJson(jvTx, false);
3807
3808 JLOG(m_journal.trace())
3809 << "AccountHistory job for account " << toBase58(accountId)
3810 << " done, found last tx.";
3811 return;
3812 }
3813
3814 sendMultiApiJson(jvTx, false);
3815 }
3816
3817 if (marker)
3818 {
3819 JLOG(m_journal.trace())
3820 << "AccountHistory job for account " << toBase58(accountId)
3821 << " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
3822 }
3823 else
3824 {
3825 break;
3826 }
3827 }
3828
3829 if (!subInfo.index_->stopHistorical_)
3830 {
3831 lastLedgerSeq = startLedgerSeq - 1;
3832 if (lastLedgerSeq <= 1)
3833 {
3834 JLOG(m_journal.trace())
3835 << "AccountHistory job for account " << toBase58(accountId)
3836 << " done, reached genesis ledger.";
3837 return;
3838 }
3839 }
3840 }
3841 });
3842}
3843
3844void
3846 std::shared_ptr<ReadView const> const& ledger,
3848{
3849 subInfo.index_->separationLedgerSeq_ = ledger->seq();
3850 auto const& accountId = subInfo.index_->accountId_;
3851 auto const accountKeylet = keylet::account(accountId);
3852 if (!ledger->exists(accountKeylet))
3853 {
3854 JLOG(m_journal.debug()) << "subAccountHistoryStart, no account " << toBase58(accountId)
3855 << ", no need to add AccountHistory job.";
3856 return;
3857 }
3858 if (accountId == genesisAccountId)
3859 {
3860 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
3861 {
3862 if (sleAcct->getFieldU32(sfSequence) == 1)
3863 {
3864 JLOG(m_journal.debug())
3865 << "subAccountHistoryStart, genesis account " << toBase58(accountId)
3866 << " does not have tx, no need to add AccountHistory job.";
3867 return;
3868 }
3869 }
3870 else
3871 {
3872 // LCOV_EXCL_START
3873 UNREACHABLE(
3874 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
3875 "access genesis account");
3876 return;
3877 // LCOV_EXCL_STOP
3878 }
3879 }
3880 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
3881 subInfo.index_->haveHistorical_ = true;
3882
3883 JLOG(m_journal.debug()) << "subAccountHistoryStart, add AccountHistory job: accountId="
3884 << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
3885
3886 addAccountHistoryJob(subInfo);
3887}
3888
3891{
3892 if (!isrListener->insertSubAccountHistory(accountId))
3893 {
3894 JLOG(m_journal.debug()) << "subAccountHistory, already subscribed to account "
3895 << toBase58(accountId);
3896 return rpcINVALID_PARAMS;
3897 }
3898
3899 std::lock_guard const sl(mSubLock);
3901 auto simIterator = mSubAccountHistory.find(accountId);
3902 if (simIterator == mSubAccountHistory.end())
3903 {
3905 inner.emplace(isrListener->getSeq(), ahi);
3906 mSubAccountHistory.insert(simIterator, std::make_pair(accountId, inner));
3907 }
3908 else
3909 {
3910 simIterator->second.emplace(isrListener->getSeq(), ahi);
3911 }
3912
3913 auto const ledger = registry_.get().getLedgerMaster().getValidatedLedger();
3914 if (ledger)
3915 {
3916 subAccountHistoryStart(ledger, ahi);
3917 }
3918 else
3919 {
3920 // The node does not have validated ledgers, so wait for
3921 // one before start streaming.
3922 // In this case, the subscription is also considered successful.
3923 JLOG(m_journal.debug()) << "subAccountHistory, no validated ledger yet, delay start";
3924 }
3925
3926 return rpcSUCCESS;
3927}
3928
3929void
3931 InfoSub::ref isrListener,
3932 AccountID const& account,
3933 bool historyOnly)
3934{
3935 if (!historyOnly)
3936 isrListener->deleteSubAccountHistory(account);
3937 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
3938}
3939
3940void
3942 std::uint64_t seq,
3943 AccountID const& account,
3944 bool historyOnly)
3945{
3946 std::lock_guard const sl(mSubLock);
3947 auto simIterator = mSubAccountHistory.find(account);
3948 if (simIterator != mSubAccountHistory.end())
3949 {
3950 auto& subInfoMap = simIterator->second;
3951 auto subInfoIter = subInfoMap.find(seq);
3952 if (subInfoIter != subInfoMap.end())
3953 {
3954 subInfoIter->second.index_->stopHistorical_ = true;
3955 }
3956
3957 if (!historyOnly)
3958 {
3959 simIterator->second.erase(seq);
3960 if (simIterator->second.empty())
3961 {
3962 mSubAccountHistory.erase(simIterator);
3963 }
3964 }
3965 JLOG(m_journal.debug()) << "unsubAccountHistory, account " << toBase58(account)
3966 << ", historyOnly = " << (historyOnly ? "true" : "false");
3967 }
3968}
3969
3970bool
3972{
3973 if (auto listeners = registry_.get().getOrderBookDB().makeBookListeners(book))
3974 {
3975 listeners->addSubscriber(isrListener);
3976 }
3977 else
3978 {
3979 // LCOV_EXCL_START
3980 UNREACHABLE("xrpl::NetworkOPsImp::subBook : null book listeners");
3981 // LCOV_EXCL_STOP
3982 }
3983 return true;
3984}
3985
3986bool
3988{
3989 if (auto listeners = registry_.get().getOrderBookDB().getBookListeners(book))
3990 listeners->removeSubscriber(uSeq);
3991
3992 return true;
3993}
3994
3997{
3998 // This code-path is exclusively used when the server is in standalone
3999 // mode via `ledger_accept`
4000 XRPL_ASSERT(m_standalone, "xrpl::NetworkOPsImp::acceptLedger : is standalone");
4001
4002 if (!m_standalone)
4003 Throw<std::runtime_error>("Operation only possible in STANDALONE mode.");
4004
4005 // FIXME Could we improve on this and remove the need for a specialized
4006 // API in Consensus?
4007 beginConsensus(m_ledgerMaster.getClosedLedger()->header().hash, {});
4008 mConsensus.simulate(registry_.get().getTimeKeeper().closeTime(), consensusDelay);
4009 return m_ledgerMaster.getCurrentLedger()->header().seq;
4010}
4011
4012// <-- bool: true=added, false=already there
4013bool
4015{
4016 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
4017 {
4018 jvResult[jss::ledger_index] = lpClosed->header().seq;
4019 jvResult[jss::ledger_hash] = to_string(lpClosed->header().hash);
4020 jvResult[jss::ledger_time] =
4021 Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
4022 if (!lpClosed->rules().enabled(featureXRPFees))
4023 jvResult[jss::fee_ref] = FEE_UNITS_DEPRECATED;
4024 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4025 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4026 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4027 jvResult[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
4028 }
4029
4031 {
4032 jvResult[jss::validated_ledgers] = registry_.get().getLedgerMaster().getCompleteLedgers();
4033 }
4034
4035 std::lock_guard const sl(mSubLock);
4036 return mStreamMaps[sLedger].emplace(isrListener->getSeq(), isrListener).second;
4037}
4038
4039// <-- bool: true=added, false=already there
4040bool
4042{
4043 std::lock_guard const sl(mSubLock);
4044 return mStreamMaps[sBookChanges].emplace(isrListener->getSeq(), isrListener).second;
4045}
4046
4047// <-- bool: true=erased, false=was not there
4048bool
4050{
4051 std::lock_guard const sl(mSubLock);
4052 return mStreamMaps[sLedger].erase(uSeq) != 0u;
4053}
4054
4055// <-- bool: true=erased, false=was not there
4056bool
4058{
4059 std::lock_guard const sl(mSubLock);
4060 return mStreamMaps[sBookChanges].erase(uSeq) != 0u;
4061}
4062
4063// <-- bool: true=added, false=already there
4064bool
4066{
4067 std::lock_guard const sl(mSubLock);
4068 return mStreamMaps[sManifests].emplace(isrListener->getSeq(), isrListener).second;
4069}
4070
4071// <-- bool: true=erased, false=was not there
4072bool
4074{
4075 std::lock_guard const sl(mSubLock);
4076 return mStreamMaps[sManifests].erase(uSeq) != 0u;
4077}
4078
4079// <-- bool: true=added, false=already there
4080bool
4081NetworkOPsImp::subServer(InfoSub::ref isrListener, Json::Value& jvResult, bool admin)
4082{
4083 uint256 uRandom;
4084
4085 if (m_standalone)
4086 jvResult[jss::stand_alone] = m_standalone;
4087
4088 // CHECKME: is it necessary to provide a random number here?
4089 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
4090
4091 auto const& feeTrack = registry_.get().getFeeTrack();
4092 jvResult[jss::random] = to_string(uRandom);
4093 jvResult[jss::server_status] = strOperatingMode(admin);
4094 jvResult[jss::load_base] = feeTrack.getLoadBase();
4095 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4096 jvResult[jss::hostid] = getHostId(admin);
4097 jvResult[jss::pubkey_node] =
4098 toBase58(TokenType::NodePublic, registry_.get().getApp().nodeIdentity().first);
4099
4100 std::lock_guard const sl(mSubLock);
4101 return mStreamMaps[sServer].emplace(isrListener->getSeq(), isrListener).second;
4102}
4103
4104// <-- bool: true=erased, false=was not there
4105bool
4107{
4108 std::lock_guard const sl(mSubLock);
4109 return mStreamMaps[sServer].erase(uSeq) != 0u;
4110}
4111
4112// <-- bool: true=added, false=already there
4113bool
4115{
4116 std::lock_guard const sl(mSubLock);
4117 return mStreamMaps[sTransactions].emplace(isrListener->getSeq(), isrListener).second;
4118}
4119
4120// <-- bool: true=erased, false=was not there
4121bool
4123{
4124 std::lock_guard const sl(mSubLock);
4125 return mStreamMaps[sTransactions].erase(uSeq) != 0u;
4126}
4127
4128// <-- bool: true=added, false=already there
4129bool
4131{
4132 std::lock_guard const sl(mSubLock);
4133 return mStreamMaps[sRTTransactions].emplace(isrListener->getSeq(), isrListener).second;
4134}
4135
4136// <-- bool: true=erased, false=was not there
4137bool
4139{
4140 std::lock_guard const sl(mSubLock);
4141 return mStreamMaps[sRTTransactions].erase(uSeq) != 0u;
4142}
4143
4144// <-- bool: true=added, false=already there
4145bool
4147{
4148 std::lock_guard const sl(mSubLock);
4149 return mStreamMaps[sValidations].emplace(isrListener->getSeq(), isrListener).second;
4150}
4151
4152void
4157
4158// <-- bool: true=erased, false=was not there
4159bool
4161{
4162 std::lock_guard const sl(mSubLock);
4163 return mStreamMaps[sValidations].erase(uSeq) != 0u;
4164}
4165
4166// <-- bool: true=added, false=already there
4167bool
4169{
4170 std::lock_guard const sl(mSubLock);
4171 return mStreamMaps[sPeerStatus].emplace(isrListener->getSeq(), isrListener).second;
4172}
4173
4174// <-- bool: true=erased, false=was not there
4175bool
4177{
4178 std::lock_guard const sl(mSubLock);
4179 return mStreamMaps[sPeerStatus].erase(uSeq) != 0u;
4180}
4181
4182// <-- bool: true=added, false=already there
4183bool
4185{
4186 std::lock_guard const sl(mSubLock);
4187 return mStreamMaps[sConsensusPhase].emplace(isrListener->getSeq(), isrListener).second;
4188}
4189
4190// <-- bool: true=erased, false=was not there
4191bool
4193{
4194 std::lock_guard const sl(mSubLock);
4195 return mStreamMaps[sConsensusPhase].erase(uSeq) != 0u;
4196}
4197
4200{
4201 std::lock_guard const sl(mSubLock);
4202
4203 subRpcMapType::iterator const it = mRpcSubMap.find(strUrl);
4204
4205 if (it != mRpcSubMap.end())
4206 return it->second;
4207
4208 return InfoSub::pointer();
4209}
4210
4213{
4214 std::lock_guard const sl(mSubLock);
4215
4216 mRpcSubMap.emplace(strUrl, rspEntry);
4217
4218 return rspEntry;
4219}
4220
4221bool
4223{
4224 std::lock_guard const sl(mSubLock);
4225 auto pInfo = findRpcSub(strUrl);
4226
4227 if (!pInfo)
4228 return false;
4229
4230 // check to see if any of the stream maps still hold a weak reference to
4231 // this entry before removing
4232 for (SubMapType const& map : mStreamMaps)
4233 {
4234 if (map.contains(pInfo->getSeq()))
4235 return false;
4236 }
4237 mRpcSubMap.erase(strUrl);
4238 return true;
4239}
4240
4241#ifndef USE_NEW_BOOK_PAGE
4242
4243// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4244// work, but it demonstrated poor performance.
4245//
4246void
4249 Book const& book,
4250 AccountID const& uTakerID,
4251 bool const bProof,
4252 unsigned int iLimit,
4253 Json::Value const& jvMarker,
4254 Json::Value& jvResult)
4255{ // CAUTION: This is the old get book page logic
4256 Json::Value& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4257
4259 uint256 const uBookBase = getBookBase(book);
4260 uint256 const uBookEnd = getQualityNext(uBookBase);
4261 uint256 uTipIndex = uBookBase;
4262
4263 if (auto stream = m_journal.trace())
4264 {
4265 stream << "getBookPage:" << book;
4266 stream << "getBookPage: uBookBase=" << uBookBase;
4267 stream << "getBookPage: uBookEnd=" << uBookEnd;
4268 stream << "getBookPage: uTipIndex=" << uTipIndex;
4269 }
4270
4271 ReadView const& view = *lpLedger;
4272
4273 bool const bGlobalFreeze =
4274 isGlobalFrozen(view, book.out.account) || isGlobalFrozen(view, book.in.account);
4275
4276 bool bDone = false;
4277 bool bDirectAdvance = true;
4278
4279 std::shared_ptr<SLE const> sleOfferDir;
4280 uint256 offerIndex;
4281 unsigned int uBookEntry = 0;
4282 STAmount saDirRate;
4283
4284 auto const rate = transferRate(view, book.out.account);
4285 auto viewJ = registry_.get().getJournal("View");
4286
4287 while (!bDone && iLimit-- > 0)
4288 {
4289 if (bDirectAdvance)
4290 {
4291 bDirectAdvance = false;
4292
4293 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4294
4295 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4296 if (ledgerIndex)
4297 {
4298 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4299 }
4300 else
4301 {
4302 sleOfferDir.reset();
4303 }
4304
4305 if (!sleOfferDir)
4306 {
4307 JLOG(m_journal.trace()) << "getBookPage: bDone";
4308 bDone = true;
4309 }
4310 else
4311 {
4312 uTipIndex = sleOfferDir->key();
4313 saDirRate = amountFromQuality(getQuality(uTipIndex));
4314
4315 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4316
4317 JLOG(m_journal.trace()) << "getBookPage: uTipIndex=" << uTipIndex;
4318 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4319 }
4320 }
4321
4322 if (!bDone)
4323 {
4324 auto sleOffer = view.read(keylet::offer(offerIndex));
4325
4326 if (sleOffer)
4327 {
4328 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4329 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4330 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4331 STAmount saOwnerFunds;
4332 bool firstOwnerOffer(true);
4333
4334 if (book.out.account == uOfferOwnerID)
4335 {
4336 // If an offer is selling issuer's own IOUs, it is fully
4337 // funded.
4338 saOwnerFunds = saTakerGets;
4339 }
4340 else if (bGlobalFreeze)
4341 {
4342 // If either asset is globally frozen, consider all offers
4343 // that aren't ours to be totally unfunded
4344 saOwnerFunds.clear(book.out);
4345 }
4346 else
4347 {
4348 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4349 if (umBalanceEntry != umBalance.end())
4350 {
4351 // Found in running balance table.
4352
4353 saOwnerFunds = umBalanceEntry->second;
4354 firstOwnerOffer = false;
4355 }
4356 else
4357 {
4358 // Did not find balance in table.
4359
4360 saOwnerFunds = accountHolds(
4361 view,
4362 uOfferOwnerID,
4363 book.out.currency,
4364 book.out.account,
4366 viewJ);
4367
4368 if (saOwnerFunds < beast::zero)
4369 {
4370 // Treat negative funds as zero.
4371
4372 saOwnerFunds.clear();
4373 }
4374 }
4375 }
4376
4377 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4378
4379 STAmount saTakerGetsFunded;
4380 STAmount saOwnerFundsLimit = saOwnerFunds;
4381 Rate offerRate = parityRate;
4382
4383 if (rate != parityRate
4384 // Have a transfer fee.
4385 && uTakerID != book.out.account
4386 // Not taking offers of own IOUs.
4387 && book.out.account != uOfferOwnerID)
4388 // Offer owner not issuing ownfunds
4389 {
4390 // Need to charge a transfer fee to offer owner.
4391 offerRate = rate;
4392 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4393 }
4394
4395 if (saOwnerFundsLimit >= saTakerGets)
4396 {
4397 // Sufficient funds no shenanigans.
4398 saTakerGetsFunded = saTakerGets;
4399 }
4400 else
4401 {
4402 // Only provide, if not fully funded.
4403
4404 saTakerGetsFunded = saOwnerFundsLimit;
4405
4406 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4407 std::min(
4408 saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4409 .setJson(jvOffer[jss::taker_pays_funded]);
4410 }
4411
4412 STAmount const saOwnerPays = (parityRate == offerRate)
4413 ? saTakerGetsFunded
4414 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4415
4416 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4417
4418 // Include all offers funded and unfunded
4419 Json::Value& jvOf = jvOffers.append(jvOffer);
4420 jvOf[jss::quality] = saDirRate.getText();
4421
4422 if (firstOwnerOffer)
4423 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4424 }
4425 else
4426 {
4427 JLOG(m_journal.warn()) << "Missing offer";
4428 }
4429
4430 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4431 {
4432 bDirectAdvance = true;
4433 }
4434 else
4435 {
4436 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4437 }
4438 }
4439 }
4440
4441 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4442 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4443}
4444
4445#else
4446
4447// This is the new code that uses the book iterators
4448// It has temporarily been disabled
4449
4450void
4453 Book const& book,
4454 AccountID const& uTakerID,
4455 bool const bProof,
4456 unsigned int iLimit,
4457 Json::Value const& jvMarker,
4458 Json::Value& jvResult)
4459{
4460 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4461
4463
4464 MetaView lesActive(lpLedger, tapNONE, true);
4465 OrderBookIterator obIterator(lesActive, book);
4466
4467 auto const rate = transferRate(lesActive, book.out.account);
4468
4469 bool const bGlobalFreeze =
4470 lesActive.isGlobalFrozen(book.out.account) || lesActive.isGlobalFrozen(book.in.account);
4471
4472 while (iLimit-- > 0 && obIterator.nextOffer())
4473 {
4474 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4475 if (sleOffer)
4476 {
4477 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4478 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4479 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4480 STAmount saDirRate = obIterator.getCurrentRate();
4481 STAmount saOwnerFunds;
4482
4483 if (book.out.account == uOfferOwnerID)
4484 {
4485 // If offer is selling issuer's own IOUs, it is fully funded.
4486 saOwnerFunds = saTakerGets;
4487 }
4488 else if (bGlobalFreeze)
4489 {
4490 // If either asset is globally frozen, consider all offers
4491 // that aren't ours to be totally unfunded
4492 saOwnerFunds.clear(book.out);
4493 }
4494 else
4495 {
4496 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4497
4498 if (umBalanceEntry != umBalance.end())
4499 {
4500 // Found in running balance table.
4501
4502 saOwnerFunds = umBalanceEntry->second;
4503 }
4504 else
4505 {
4506 // Did not find balance in table.
4507
4508 saOwnerFunds = lesActive.accountHolds(
4509 uOfferOwnerID, book.out.currency, book.out.account, fhZERO_IF_FROZEN);
4510
4511 if (saOwnerFunds.isNegative())
4512 {
4513 // Treat negative funds as zero.
4514
4515 saOwnerFunds.zero();
4516 }
4517 }
4518 }
4519
4520 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4521
4522 STAmount saTakerGetsFunded;
4523 STAmount saOwnerFundsLimit = saOwnerFunds;
4524 Rate offerRate = parityRate;
4525
4526 if (rate != parityRate
4527 // Have a transfer fee.
4528 && uTakerID != book.out.account
4529 // Not taking offers of own IOUs.
4530 && book.out.account != uOfferOwnerID)
4531 // Offer owner not issuing ownfunds
4532 {
4533 // Need to charge a transfer fee to offer owner.
4534 offerRate = rate;
4535 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4536 }
4537
4538 if (saOwnerFundsLimit >= saTakerGets)
4539 {
4540 // Sufficient funds no shenanigans.
4541 saTakerGetsFunded = saTakerGets;
4542 }
4543 else
4544 {
4545 // Only provide, if not fully funded.
4546 saTakerGetsFunded = saOwnerFundsLimit;
4547
4548 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4549
4550 // TODO(tom): The result of this expression is not used - what's
4551 // going on here?
4552 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4553 .setJson(jvOffer[jss::taker_pays_funded]);
4554 }
4555
4556 STAmount saOwnerPays = (parityRate == offerRate)
4557 ? saTakerGetsFunded
4558 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4559
4560 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4561
4562 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4563 {
4564 // Only provide funded offers and offers of the taker.
4565 Json::Value& jvOf = jvOffers.append(jvOffer);
4566 jvOf[jss::quality] = saDirRate.getText();
4567 }
4568 }
4569 }
4570
4571 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4572 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4573}
4574
4575#endif
4576
4577inline void
4579{
4580 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4581 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4583 counters[static_cast<std::size_t>(mode)].dur += current;
4584
4585 std::lock_guard const lock(m_statsMutex);
4587 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].dur.count());
4589 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].dur.count());
4591 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4593 counters[static_cast<std::size_t>(OperatingMode::TRACKING)].dur.count());
4594 m_stats.full_duration.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4595
4597 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions);
4599 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].transitions);
4601 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4603 counters[static_cast<std::size_t>(OperatingMode::TRACKING)].transitions);
4605 counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4606}
4607
4608void
4610{
4611 auto now = std::chrono::steady_clock::now();
4612
4613 std::lock_guard const lock(mutex_);
4614 ++counters_[static_cast<std::size_t>(om)].transitions;
4615 if (om == OperatingMode::FULL && counters_[static_cast<std::size_t>(om)].transitions == 1)
4616 {
4617 initialSyncUs_ =
4618 std::chrono::duration_cast<std::chrono::microseconds>(now - processStart_).count();
4619 }
4620 counters_[static_cast<std::size_t>(mode_)].dur +=
4621 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4622
4623 mode_ = om;
4624 start_ = now;
4625}
4626
4627void
4629{
4630 auto [counters, mode, start, initialSync] = getCounterData();
4631 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4633 counters[static_cast<std::size_t>(mode)].dur += current;
4634
4635 obj[jss::state_accounting] = Json::objectValue;
4637 i <= static_cast<std::size_t>(OperatingMode::FULL);
4638 ++i)
4639 {
4640 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4641 auto& state = obj[jss::state_accounting][states_[i]];
4642 state[jss::transitions] = std::to_string(counters[i].transitions);
4643 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4644 }
4645 obj[jss::server_state_duration_us] = std::to_string(current.count());
4646 if (initialSync != 0u)
4647 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4648}
4649
4650//------------------------------------------------------------------------------
4651
4654 ServiceRegistry& registry,
4656 bool standalone,
4657 std::size_t minPeerCount,
4658 bool startValid,
4659 JobQueue& jobQueue,
4661 ValidatorKeys const& validatorKeys,
4662 boost::asio::io_context& ioCtx,
4663 beast::Journal journal,
4664 beast::insight::Collector::ptr const& collector)
4665{
4667 registry,
4668 clock,
4669 standalone,
4670 minPeerCount,
4671 startValid,
4672 jobQueue,
4674 validatorKeys,
4675 ioCtx,
4676 journal,
4677 collector);
4678}
4679
4680} // namespace xrpl
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:44
Represents a JSON value.
Definition json_value.h:130
const_iterator begin() const
Json::UInt UInt
Definition json_value.h:137
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
A metric for measuring an integral value.
Definition Gauge.h:20
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:48
A reference to a handler for performing polled collection.
Definition Hook.h:12
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
Specifies an order book.
Definition Book.h:16
Issue in
Definition Book.h:18
Issue out
Definition Book.h:19
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
Definition ClusterNode.h:32
NetClock::time_point getReportTime() const
Definition ClusterNode.h:38
PublicKey const & identity() const
Definition ClusterNode.h:44
std::string const & name() const
Definition ClusterNode.h:26
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:33
Currency currency
Definition Issue.h:15
AccountID account
Definition Issue.h:16
A pool of threads to perform work.
Definition JobQueue.h:38
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
Json::Value getJson(int c=0)
Definition JobQueue.cpp:173
std::chrono::seconds getValidatedLedgerAge()
bool haveValidated()
Whether we have ever fully validated a ledger.
std::size_t getFetchPackCacheSize() const
std::shared_ptr< Ledger const > getClosedLedger()
std::shared_ptr< Ledger const > getValidatedLedger()
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< ReadView const > getCurrentLedger()
Manages the current fee schedule.
Manages load sources.
Definition LoadManager.h:26
void heartbeat()
Reset the stall detection timer.
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(Json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< Json::StaticString const, 5 > const states_
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
ServerFeeSummary mLastFeeSummary
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
DispatchState mDispatchState
std::size_t const minPeerCount_
beast::Journal m_journal
static std::array< char const *, 5 > const states_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(Json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void stop() override
SubInfoMapType mSubRTAccount
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
bool unsubRTTransactions(std::uint64_t uListener) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
bool processTrustedProposal(RCLCxPeerPos proposal) override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
std::atomic< OperatingMode > mMode
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
bool const m_standalone
std::recursive_mutex mSubLock
bool isNeedNetworkLedger() override
DispatchState
Synchronization states for transaction batches.
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
bool subConsensus(InfoSub::ref ispListener) override
std::reference_wrapper< ServiceRegistry > registry_
bool unsubBook(std::uint64_t uListener, Book const &) override
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
bool isFull() override
void clearAmendmentWarned() override
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
std::unique_ptr< LocalTxs > m_localTX
NetworkOPsImp(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &ioCtx, beast::Journal journal, beast::insight::Collector::ptr const &collector)
void setStandAlone() override
void setNeedNetworkLedger() override
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
bool unsubServer(std::uint64_t uListener) override
SubAccountHistoryMapType mSubAccountHistory
bool unsubConsensus(std::uint64_t uListener) override
std::condition_variable mCond
void pubManifest(Manifest const &) override
RCLConsensus mConsensus
void consensusViewChange() override
boost::asio::steady_timer accountHistoryTxTimer_
Json::Value getConsensusInfo() override
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
ConsensusPhase mLastConsensusPhase
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
std::vector< TransactionStatus > mTransactions
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
LedgerMaster & m_ledgerMaster
Json::Value getServerInfo(bool human, bool admin, bool counters) override
StateAccounting accounting_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
subRpcMapType mRpcSubMap
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex m_statsMutex
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
SubInfoMapType mSubAccount
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
boost::asio::steady_timer clusterTimer_
bool unsubPeerStatus(std::uint64_t uListener) override
void reportFeeChange() override
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isBlocked() override
~NetworkOPsImp() override
Json::Value getLedgerFetchInfo() override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:71
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:45
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:43
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:430
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:31
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
Issue const & issue() const
Definition STAmount.h:470
std::string getText() const override
Definition STAmount.cpp:656
void setJson(Json::Value &) const
Definition STAmount.cpp:616
std::size_t size() const noexcept
Definition Serializer.h:50
void const * data() const noexcept
Definition Serializer.h:56
Service registry for dependency injection.
static time_point now()
Validator keys and manifest as set in configuration file.
Json::Value jsonClipped() const
Definition XRPAmount.h:197
constexpr double decimalXRP() const
Definition XRPAmount.h:241
iterator begin()
Definition base_uint.h:112
bool isZero() const
Definition base_uint.h:513
static constexpr std::size_t size()
Definition base_uint.h:499
bool isNonZero() const
Definition base_uint.h:518
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:202
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
STL namespace.
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:68
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:33
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:27
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
std::string const & getCommitHash()
Definition Git.cpp:18
std::string const & getBuildBranch()
Definition Git.cpp:25
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:243
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:165
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:342
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:50
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:69
@ terQUEUED
Definition TER.h:205
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
constexpr FlagValue tfInnerBatchTxn
Definition TxFlags.h:41
bool isTerRetry(TER x) noexcept
Definition TER.h:645
@ fhZERO_IF_FROZEN
@ fhIGNORE_FREEZE
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ INVALID
Definition Transaction.h:29
@ OBSOLETE
Definition Transaction.h:35
@ INCLUDED
Definition Transaction.h:30
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
Definition apply.cpp:21
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
@ tefPAST_SEQ
Definition TER.h:155
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:131
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1159
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j, SpendableHandling includeFullBalance=shSIMPLE_BALANCE)
Number root(Number f, unsigned d)
Definition Number.cpp:958
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:228
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:34
static auto const genesisAccountId
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:149
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:57
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
@ current
This was a new validation and was added.
constexpr std::size_t maxPoppedTransactions
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Check if the issuer has the global freeze flag set.
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:975
@ jtNETOP_CLUSTER
Definition Job.h:54
@ jtCLIENT_CONSENSUS
Definition Job.h:27
@ jtTXN_PROC
Definition Job.h:61
@ jtCLIENT_ACCT_HIST
Definition Job.h:28
@ jtTRANSACTION
Definition Job.h:41
@ jtCLIENT_FEE_CHANGE
Definition Job.h:26
@ jtBATCH
Definition Job.h:44
bool isTefFailure(TER x) noexcept
Definition TER.h:639
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
auto constexpr muldiv_max
Definition mulDiv.h:8
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:123
ConsensusPhase
Phases of consensus for a single ledger round.
@ open
We haven't closed our ledger yet, but others might have.
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:54
constexpr std::uint32_t FEE_UNITS_DEPRECATED
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:98
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:68
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:172
ApplyFlags
Definition ApplyView.h:10
@ tapNONE
Definition ApplyView.h:11
@ tapFAIL_HARD
Definition ApplyView.h:15
@ tapUNLIMITED
Definition ApplyView.h:22
bool isTelLocal(TER x) noexcept
Definition TER.h:627
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
@ temINVALID_FLAG
Definition TER.h:91
@ temBAD_SIGNATURE
Definition TER.h:85
bool isTesSuccess(TER x) noexcept
Definition TER.h:651
static std::uint32_t trunc32(std::uint64_t v)
static std::array< char const *, 5 > const stateNames
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:50
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:800
bool isTemMalformed(TER x) noexcept
Definition TER.h:633
@ tesSUCCESS
Definition TER.h:225
std::unique_ptr< NetworkOPs > make_NetworkOPs(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startValid, JobQueue &jobQueue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &ioCtx, beast::Journal journal, beast::insight::Collector::ptr const &collector)
error_code_i
Definition ErrorCodes.h:20
@ rpcINTERNAL
Definition ErrorCodes.h:110
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:64
@ rpcSUCCESS
Definition ErrorCodes.h:24
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:153
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:152
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:154
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:66
std::string serialized
The manifest in serialized form.
Definition Manifest.h:63
Blob getMasterSignature() const
Returns manifest master key signature.
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:78
std::optional< Blob > getSignature() const
Returns manifest signature.
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:72
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:75
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge connected_transitions
beast::insight::Gauge full_transitions
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge full_duration
beast::insight::Hook hook
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:20
Data format for exchanging consumption information across peers.
Definition Gossip.h:12
std::vector< Item > items
Definition Gossip.h:24
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:144
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:115
Sends a message to all peers.
Definition predicates.h:12
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)