rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/app/consensus/RCLConsensus.h>
21#include <xrpld/app/consensus/RCLValidations.h>
22#include <xrpld/app/ledger/AcceptedLedger.h>
23#include <xrpld/app/ledger/InboundLedgers.h>
24#include <xrpld/app/ledger/LedgerMaster.h>
25#include <xrpld/app/ledger/LedgerToJson.h>
26#include <xrpld/app/ledger/LocalTxs.h>
27#include <xrpld/app/ledger/OpenLedger.h>
28#include <xrpld/app/ledger/OrderBookDB.h>
29#include <xrpld/app/ledger/TransactionMaster.h>
30#include <xrpld/app/main/LoadManager.h>
31#include <xrpld/app/main/Tuning.h>
32#include <xrpld/app/misc/AmendmentTable.h>
33#include <xrpld/app/misc/DeliverMax.h>
34#include <xrpld/app/misc/HashRouter.h>
35#include <xrpld/app/misc/LoadFeeTrack.h>
36#include <xrpld/app/misc/NetworkOPs.h>
37#include <xrpld/app/misc/Transaction.h>
38#include <xrpld/app/misc/TxQ.h>
39#include <xrpld/app/misc/ValidatorKeys.h>
40#include <xrpld/app/misc/ValidatorList.h>
41#include <xrpld/app/misc/detail/AccountTxPaging.h>
42#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
43#include <xrpld/app/tx/apply.h>
44#include <xrpld/consensus/Consensus.h>
45#include <xrpld/consensus/ConsensusParms.h>
46#include <xrpld/overlay/Cluster.h>
47#include <xrpld/overlay/Overlay.h>
48#include <xrpld/overlay/predicates.h>
49#include <xrpld/perflog/PerfLog.h>
50#include <xrpld/rpc/BookChanges.h>
51#include <xrpld/rpc/CTID.h>
52#include <xrpld/rpc/DeliveredAmount.h>
53#include <xrpld/rpc/MPTokenIssuanceID.h>
54#include <xrpld/rpc/ServerHandler.h>
55
56#include <xrpl/basics/UptimeClock.h>
57#include <xrpl/basics/mulDiv.h>
58#include <xrpl/basics/safe_cast.h>
59#include <xrpl/basics/scope.h>
60#include <xrpl/beast/utility/rngfill.h>
61#include <xrpl/crypto/RFC1751.h>
62#include <xrpl/crypto/csprng.h>
63#include <xrpl/protocol/BuildInfo.h>
64#include <xrpl/protocol/Feature.h>
65#include <xrpl/protocol/MultiApiJson.h>
66#include <xrpl/protocol/NFTSyntheticSerializer.h>
67#include <xrpl/protocol/RPCErr.h>
68#include <xrpl/protocol/TxFlags.h>
69#include <xrpl/protocol/jss.h>
70#include <xrpl/resource/Fees.h>
71#include <xrpl/resource/ResourceManager.h>
72
73#include <boost/asio/ip/host_name.hpp>
74#include <boost/asio/steady_timer.hpp>
75
76#include <algorithm>
77#include <exception>
78#include <mutex>
79#include <optional>
80#include <set>
81#include <sstream>
82#include <string>
83#include <tuple>
84#include <unordered_map>
85
86namespace ripple {
87
88class NetworkOPsImp final : public NetworkOPs
89{
95 {
96 public:
98 bool const admin;
99 bool const local;
101 bool applied = false;
103
106 bool a,
107 bool l,
108 FailHard f)
109 : transaction(t), admin(a), local(l), failType(f)
110 {
111 XRPL_ASSERT(
113 "ripple::NetworkOPsImp::TransactionStatus::TransactionStatus : "
114 "valid inputs");
115 }
116 };
117
121 enum class DispatchState : unsigned char {
122 none,
123 scheduled,
124 running,
125 };
126
128
144 {
152
156 std::chrono::steady_clock::time_point start_ =
158 std::chrono::steady_clock::time_point const processStart_ = start_;
161
162 public:
164 {
166 .transitions = 1;
167 }
168
175 void
177
183 void
184 json(Json::Value& obj) const;
185
187 {
189 decltype(mode_) mode;
190 decltype(start_) start;
192 };
193
196 {
199 }
200 };
201
204 {
205 ServerFeeSummary() = default;
206
208 XRPAmount fee,
209 TxQ::Metrics&& escalationMetrics,
210 LoadFeeTrack const& loadFeeTrack);
211 bool
212 operator!=(ServerFeeSummary const& b) const;
213
214 bool
216 {
217 return !(*this != b);
218 }
219
224 };
225
226public:
228 Application& app,
230 bool standalone,
231 std::size_t minPeerCount,
232 bool start_valid,
233 JobQueue& job_queue,
235 ValidatorKeys const& validatorKeys,
236 boost::asio::io_context& io_svc,
237 beast::Journal journal,
238 beast::insight::Collector::ptr const& collector)
239 : app_(app)
240 , m_journal(journal)
243 , heartbeatTimer_(io_svc)
244 , clusterTimer_(io_svc)
245 , accountHistoryTxTimer_(io_svc)
246 , mConsensus(
247 app,
249 setup_FeeVote(app_.config().section("voting")),
250 app_.logs().journal("FeeVote")),
252 *m_localTX,
253 app.getInboundTransactions(),
254 beast::get_abstract_clock<std::chrono::steady_clock>(),
255 validatorKeys,
256 app_.logs().journal("LedgerConsensus"))
257 , validatorPK_(
258 validatorKeys.keys ? validatorKeys.keys->publicKey
259 : decltype(validatorPK_){})
261 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
262 : decltype(validatorMasterPK_){})
264 , m_job_queue(job_queue)
265 , m_standalone(standalone)
266 , minPeerCount_(start_valid ? 0 : minPeerCount)
267 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
268 {
269 }
270
271 ~NetworkOPsImp() override
272 {
273 // This clear() is necessary to ensure the shared_ptrs in this map get
274 // destroyed NOW because the objects in this map invoke methods on this
275 // class when they are destroyed
277 }
278
279public:
281 getOperatingMode() const override;
282
284 strOperatingMode(OperatingMode const mode, bool const admin) const override;
285
287 strOperatingMode(bool const admin = false) const override;
288
289 //
290 // Transaction operations.
291 //
292
293 // Must complete immediately.
294 void
296
297 void
299 std::shared_ptr<Transaction>& transaction,
300 bool bUnlimited,
301 bool bLocal,
302 FailHard failType) override;
303
304 void
305 processTransactionSet(CanonicalTXSet const& set) override;
306
315 void
318 bool bUnlimited,
319 FailHard failType);
320
330 void
333 bool bUnlimited,
334 FailHard failtype);
335
336private:
337 bool
339
340 void
343 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
344
345public:
349 void
351
357 void
359
360 //
361 // Owner functions.
362 //
363
367 AccountID const& account) override;
368
369 //
370 // Book functions.
371 //
372
373 void
376 Book const&,
377 AccountID const& uTakerID,
378 bool const bProof,
379 unsigned int iLimit,
380 Json::Value const& jvMarker,
381 Json::Value& jvResult) override;
382
383 // Ledger proposal/close functions.
384 bool
386
387 bool
390 std::string const& source) override;
391
392 void
393 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
394
395 // Network state machine.
396
397 // Used for the "jump" case.
398private:
399 void
401 bool
403
404public:
405 bool
407 uint256 const& networkClosed,
408 std::unique_ptr<std::stringstream> const& clog) override;
409 void
411 void
412 setStandAlone() override;
413
417 void
418 setStateTimer() override;
419
420 void
421 setNeedNetworkLedger() override;
422 void
423 clearNeedNetworkLedger() override;
424 bool
425 isNeedNetworkLedger() override;
426 bool
427 isFull() override;
428
429 void
430 setMode(OperatingMode om) override;
431
432 bool
433 isBlocked() override;
434 bool
435 isAmendmentBlocked() override;
436 void
437 setAmendmentBlocked() override;
438 bool
439 isAmendmentWarned() override;
440 void
441 setAmendmentWarned() override;
442 void
443 clearAmendmentWarned() override;
444 bool
445 isUNLBlocked() override;
446 void
447 setUNLBlocked() override;
448 void
449 clearUNLBlocked() override;
450 void
451 consensusViewChange() override;
452
454 getConsensusInfo() override;
456 getServerInfo(bool human, bool admin, bool counters) override;
457 void
458 clearLedgerFetch() override;
460 getLedgerFetchInfo() override;
463 std::optional<std::chrono::milliseconds> consensusDelay) override;
464 void
465 reportFeeChange() override;
466 void
468
469 void
470 updateLocalTx(ReadView const& view) override;
472 getLocalTxCount() override;
473
474 //
475 // Monitoring: publisher side.
476 //
477 void
478 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
479 void
482 std::shared_ptr<STTx const> const& transaction,
483 TER result) override;
484 void
485 pubValidation(std::shared_ptr<STValidation> const& val) override;
486
487 //--------------------------------------------------------------------------
488 //
489 // InfoSub::Source.
490 //
491 void
493 InfoSub::ref ispListener,
494 hash_set<AccountID> const& vnaAccountIDs,
495 bool rt) override;
496 void
498 InfoSub::ref ispListener,
499 hash_set<AccountID> const& vnaAccountIDs,
500 bool rt) override;
501
502 // Just remove the subscription from the tracking
503 // not from the InfoSub. Needed for InfoSub destruction
504 void
506 std::uint64_t seq,
507 hash_set<AccountID> const& vnaAccountIDs,
508 bool rt) override;
509
511 subAccountHistory(InfoSub::ref ispListener, AccountID const& account)
512 override;
513 void
515 InfoSub::ref ispListener,
516 AccountID const& account,
517 bool historyOnly) override;
518
519 void
521 std::uint64_t seq,
522 AccountID const& account,
523 bool historyOnly) override;
524
525 bool
526 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
527 bool
528 unsubLedger(std::uint64_t uListener) override;
529
530 bool
531 subBookChanges(InfoSub::ref ispListener) override;
532 bool
533 unsubBookChanges(std::uint64_t uListener) override;
534
535 bool
536 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin)
537 override;
538 bool
539 unsubServer(std::uint64_t uListener) override;
540
541 bool
542 subBook(InfoSub::ref ispListener, Book const&) override;
543 bool
544 unsubBook(std::uint64_t uListener, Book const&) override;
545
546 bool
547 subManifests(InfoSub::ref ispListener) override;
548 bool
549 unsubManifests(std::uint64_t uListener) override;
550 void
551 pubManifest(Manifest const&) override;
552
553 bool
554 subTransactions(InfoSub::ref ispListener) override;
555 bool
556 unsubTransactions(std::uint64_t uListener) override;
557
558 bool
559 subRTTransactions(InfoSub::ref ispListener) override;
560 bool
561 unsubRTTransactions(std::uint64_t uListener) override;
562
563 bool
564 subValidations(InfoSub::ref ispListener) override;
565 bool
566 unsubValidations(std::uint64_t uListener) override;
567
568 bool
569 subPeerStatus(InfoSub::ref ispListener) override;
570 bool
571 unsubPeerStatus(std::uint64_t uListener) override;
572 void
573 pubPeerStatus(std::function<Json::Value(void)> const&) override;
574
575 bool
576 subConsensus(InfoSub::ref ispListener) override;
577 bool
578 unsubConsensus(std::uint64_t uListener) override;
579
581 findRpcSub(std::string const& strUrl) override;
583 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
584 bool
585 tryRemoveRpcSub(std::string const& strUrl) override;
586
587 void
588 stop() override
589 {
590 {
591 try
592 {
593 heartbeatTimer_.cancel();
594 }
595 catch (boost::system::system_error const& e)
596 {
597 JLOG(m_journal.error())
598 << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
599 }
600
601 try
602 {
603 clusterTimer_.cancel();
604 }
605 catch (boost::system::system_error const& e)
606 {
607 JLOG(m_journal.error())
608 << "NetworkOPs: clusterTimer cancel error: " << e.what();
609 }
610
611 try
612 {
613 accountHistoryTxTimer_.cancel();
614 }
615 catch (boost::system::system_error const& e)
616 {
617 JLOG(m_journal.error())
618 << "NetworkOPs: accountHistoryTxTimer cancel error: "
619 << e.what();
620 }
621 }
622 // Make sure that any waitHandlers pending in our timers are done.
623 using namespace std::chrono_literals;
624 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
625 }
626
627 void
628 stateAccounting(Json::Value& obj) override;
629
630private:
631 void
632 setTimer(
633 boost::asio::steady_timer& timer,
634 std::chrono::milliseconds const& expiry_time,
635 std::function<void()> onExpire,
636 std::function<void()> onError);
637 void
639 void
641 void
643 void
645
647 transJson(
648 std::shared_ptr<STTx const> const& transaction,
649 TER result,
650 bool validated,
653
654 void
657 AcceptedLedgerTx const& transaction,
658 bool last);
659
660 void
663 AcceptedLedgerTx const& transaction,
664 bool last);
665
666 void
669 std::shared_ptr<STTx const> const& transaction,
670 TER result);
671
672 void
673 pubServer();
674 void
676
678 getHostId(bool forAdmin);
679
680private:
684
685 /*
686 * With a validated ledger to separate history and future, the node
687 * streams historical txns with negative indexes starting from -1,
688 * and streams future txns starting from index 0.
689 * The SubAccountHistoryIndex struct maintains these indexes.
690 * It also has a flag stopHistorical_ for stopping streaming
691 * the historical txns.
692 */
729
733 void
737 void
739 void
741
744
746
748
750
755
757 boost::asio::steady_timer heartbeatTimer_;
758 boost::asio::steady_timer clusterTimer_;
759 boost::asio::steady_timer accountHistoryTxTimer_;
760
762
765
767
769
772
774
776
777 enum SubTypes {
778 sLedger, // Accepted ledgers.
779 sManifests, // Received validator manifests.
780 sServer, // When server changes connectivity state.
781 sTransactions, // All accepted transactions.
782 sRTTransactions, // All proposed and accepted transactions.
783 sValidations, // Received validations.
784 sPeerStatus, // Peer status changes.
785 sConsensusPhase, // Consensus phase
786 sBookChanges, // Per-ledger order book changes
787 sLastEntry // Any new entry must be ADDED ABOVE this one
788 };
789
791
793
795
796 // Whether we are in standalone mode.
797 bool const m_standalone;
798
799 // The number of nodes that we need to consider ourselves connected.
801
802 // Transaction batching.
807
809
812
813private:
814 struct Stats
815 {
816 template <class Handler>
818 Handler const& handler,
819 beast::insight::Collector::ptr const& collector)
820 : hook(collector->make_hook(handler))
821 , disconnected_duration(collector->make_gauge(
822 "State_Accounting",
823 "Disconnected_duration"))
824 , connected_duration(collector->make_gauge(
825 "State_Accounting",
826 "Connected_duration"))
828 collector->make_gauge("State_Accounting", "Syncing_duration"))
829 , tracking_duration(collector->make_gauge(
830 "State_Accounting",
831 "Tracking_duration"))
833 collector->make_gauge("State_Accounting", "Full_duration"))
834 , disconnected_transitions(collector->make_gauge(
835 "State_Accounting",
836 "Disconnected_transitions"))
837 , connected_transitions(collector->make_gauge(
838 "State_Accounting",
839 "Connected_transitions"))
840 , syncing_transitions(collector->make_gauge(
841 "State_Accounting",
842 "Syncing_transitions"))
843 , tracking_transitions(collector->make_gauge(
844 "State_Accounting",
845 "Tracking_transitions"))
847 collector->make_gauge("State_Accounting", "Full_transitions"))
848 {
849 }
850
857
863 };
864
865 std::mutex m_statsMutex; // Mutex to lock m_stats
867
868private:
869 void
871};
872
873//------------------------------------------------------------------------------
874
876 {"disconnected", "connected", "syncing", "tracking", "full"}};
877
879
887
888static auto const genesisAccountId = calcAccountID(
890 .first);
891
892//------------------------------------------------------------------------------
893inline OperatingMode
895{
896 return mMode;
897}
898
899inline std::string
900NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
901{
902 return strOperatingMode(mMode, admin);
903}
904
905inline void
910
911inline void
916
917inline void
922
923inline bool
928
929inline bool
934
937{
938 static std::string const hostname = boost::asio::ip::host_name();
939
940 if (forAdmin)
941 return hostname;
942
943 // For non-admin uses hash the node public key into a
944 // single RFC1751 word:
945 static std::string const shroudedHostId = [this]() {
946 auto const& id = app_.nodeIdentity();
947
948 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
949 }();
950
951 return shroudedHostId;
952}
953
954void
956{
958
959 // Only do this work if a cluster is configured
960 if (app_.cluster().size() != 0)
962}
963
964void
966 boost::asio::steady_timer& timer,
967 std::chrono::milliseconds const& expiry_time,
968 std::function<void()> onExpire,
969 std::function<void()> onError)
970{
971 // Only start the timer if waitHandlerCounter_ is not yet joined.
972 if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
973 [this, onExpire, onError](boost::system::error_code const& e) {
974 if ((e.value() == boost::system::errc::success) &&
975 (!m_job_queue.isStopped()))
976 {
977 onExpire();
978 }
979 // Recover as best we can if an unexpected error occurs.
980 if (e.value() != boost::system::errc::success &&
981 e.value() != boost::asio::error::operation_aborted)
982 {
983 // Try again later and hope for the best.
984 JLOG(m_journal.error())
985 << "Timer got error '" << e.message()
986 << "'. Restarting timer.";
987 onError();
988 }
989 }))
990 {
991 timer.expires_after(expiry_time);
992 timer.async_wait(std::move(*optionalCountedHandler));
993 }
994}
995
996void
997NetworkOPsImp::setHeartbeatTimer()
998{
999 setTimer(
1000 heartbeatTimer_,
1001 mConsensus.parms().ledgerGRANULARITY,
1002 [this]() {
1003 m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
1004 processHeartbeatTimer();
1005 });
1006 },
1007 [this]() { setHeartbeatTimer(); });
1008}
1009
1010void
1011NetworkOPsImp::setClusterTimer()
1012{
1013 using namespace std::chrono_literals;
1014
1015 setTimer(
1016 clusterTimer_,
1017 10s,
1018 [this]() {
1019 m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
1020 processClusterTimer();
1021 });
1022 },
1023 [this]() { setClusterTimer(); });
1024}
1025
1026void
1027NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
1028{
1029 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
1030 << toBase58(subInfo.index_->accountId_);
1031 using namespace std::chrono_literals;
1032 setTimer(
1033 accountHistoryTxTimer_,
1034 4s,
1035 [this, subInfo]() { addAccountHistoryJob(subInfo); },
1036 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1037}
1038
1039void
1040NetworkOPsImp::processHeartbeatTimer()
1041{
1042 RclConsensusLogger clog(
1043 "Heartbeat Timer", mConsensus.validating(), m_journal);
1044 {
1045 std::unique_lock lock{app_.getMasterMutex()};
1046
1047 // VFALCO NOTE This is for diagnosing a crash on exit
1048 LoadManager& mgr(app_.getLoadManager());
1049 mgr.heartbeat();
1050
1051 std::size_t const numPeers = app_.overlay().size();
1052
1053 // do we have sufficient peers? If not, we are disconnected.
1054 if (numPeers < minPeerCount_)
1055 {
1056 if (mMode != OperatingMode::DISCONNECTED)
1057 {
1058 setMode(OperatingMode::DISCONNECTED);
1060 ss << "Node count (" << numPeers << ") has fallen "
1061 << "below required minimum (" << minPeerCount_ << ").";
1062 JLOG(m_journal.warn()) << ss.str();
1063 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
1064 }
1065 else
1066 {
1067 CLOG(clog.ss())
1068 << "already DISCONNECTED. too few peers (" << numPeers
1069 << "), need at least " << minPeerCount_;
1070 }
1071
1072 // MasterMutex lock need not be held to call setHeartbeatTimer()
1073 lock.unlock();
1074 // We do not call mConsensus.timerEntry until there are enough
1075 // peers providing meaningful inputs to consensus
1076 setHeartbeatTimer();
1077
1078 return;
1079 }
1080
1081 if (mMode == OperatingMode::DISCONNECTED)
1082 {
1083 setMode(OperatingMode::CONNECTED);
1084 JLOG(m_journal.info())
1085 << "Node count (" << numPeers << ") is sufficient.";
1086 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
1087 << " peers. ";
1088 }
1089
1090 // Check if the last validated ledger forces a change between these
1091 // states.
1092 auto origMode = mMode.load();
1093 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
1094 if (mMode == OperatingMode::SYNCING)
1095 setMode(OperatingMode::SYNCING);
1096 else if (mMode == OperatingMode::CONNECTED)
1097 setMode(OperatingMode::CONNECTED);
1098 auto newMode = mMode.load();
1099 if (origMode != newMode)
1100 {
1101 CLOG(clog.ss())
1102 << ", changing to " << strOperatingMode(newMode, true);
1103 }
1104 CLOG(clog.ss()) << ". ";
1105 }
1106
1107 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
1108
1109 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
1110 ConsensusPhase const currPhase = mConsensus.phase();
1111 if (mLastConsensusPhase != currPhase)
1112 {
1113 reportConsensusStateChange(currPhase);
1114 mLastConsensusPhase = currPhase;
1115 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1116 }
1117 CLOG(clog.ss()) << ". ";
1118
1119 setHeartbeatTimer();
1120}
1121
1122void
1123NetworkOPsImp::processClusterTimer()
1124{
1125 if (app_.cluster().size() == 0)
1126 return;
1127
1128 using namespace std::chrono_literals;
1129
1130 bool const update = app_.cluster().update(
1131 app_.nodeIdentity().first,
1132 "",
1133 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1134 ? app_.getFeeTrack().getLocalFee()
1135 : 0,
1136 app_.timeKeeper().now());
1137
1138 if (!update)
1139 {
1140 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1141 setClusterTimer();
1142 return;
1143 }
1144
1145 protocol::TMCluster cluster;
1146 app_.cluster().for_each([&cluster](ClusterNode const& node) {
1147 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1148 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1149 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1150 n.set_nodeload(node.getLoadFee());
1151 if (!node.name().empty())
1152 n.set_nodename(node.name());
1153 });
1154
1155 Resource::Gossip gossip = app_.getResourceManager().exportConsumers();
1156 for (auto& item : gossip.items)
1157 {
1158 protocol::TMLoadSource& node = *cluster.add_loadsources();
1159 node.set_name(to_string(item.address));
1160 node.set_cost(item.balance);
1161 }
1162 app_.overlay().foreach(send_if(
1163 std::make_shared<Message>(cluster, protocol::mtCLUSTER),
1164 peer_in_cluster()));
1165 setClusterTimer();
1166}
1167
1168//------------------------------------------------------------------------------
1169
1171NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
1172 const
1173{
1174 if (mode == OperatingMode::FULL && admin)
1175 {
1176 auto const consensusMode = mConsensus.mode();
1177 if (consensusMode != ConsensusMode::wrongLedger)
1178 {
1179 if (consensusMode == ConsensusMode::proposing)
1180 return "proposing";
1181
1182 if (mConsensus.validating())
1183 return "validating";
1184 }
1185 }
1186
1187 return states_[static_cast<std::size_t>(mode)];
1188}
1189
1190void
1191NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1192{
1193 if (isNeedNetworkLedger())
1194 {
1195 // Nothing we can do if we've never been in sync
1196 return;
1197 }
1198
1199 // Enforce Network bar for batch txn
1200 if (iTrans->isFlag(tfInnerBatchTxn) &&
1201 m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1202 {
1203 JLOG(m_journal.error())
1204 << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1205 return;
1206 }
1207
1208 // this is an asynchronous interface
1209 auto const trans = sterilize(*iTrans);
1210
1211 auto const txid = trans->getTransactionID();
1212 auto const flags = app_.getHashRouter().getFlags(txid);
1213
1214 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1215 {
1216 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1217 return;
1218 }
1219
1220 try
1221 {
1222 auto const [validity, reason] = checkValidity(
1223 app_.getHashRouter(),
1224 *trans,
1225 m_ledgerMaster.getValidatedRules(),
1226 app_.config());
1227
1228 if (validity != Validity::Valid)
1229 {
1230 JLOG(m_journal.warn())
1231 << "Submitted transaction invalid: " << reason;
1232 return;
1233 }
1234 }
1235 catch (std::exception const& ex)
1236 {
1237 JLOG(m_journal.warn())
1238 << "Exception checking transaction " << txid << ": " << ex.what();
1239
1240 return;
1241 }
1242
1243 std::string reason;
1244
1245 auto tx = std::make_shared<Transaction>(trans, reason, app_);
1246
1247 m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
1248 auto t = tx;
1249 processTransaction(t, false, false, FailHard::no);
1250 });
1251}
1252
1253bool
1254NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1255{
1256 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1257
1258 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1259 {
1260 // cached bad
1261 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1262 transaction->setStatus(INVALID);
1263 transaction->setResult(temBAD_SIGNATURE);
1264 return false;
1265 }
1266
1267 auto const view = m_ledgerMaster.getCurrentLedger();
1268
1269 // This function is called by several different parts of the codebase
1270 // under no circumstances will we ever accept an inner txn within a batch
1271 // txn from the network.
1272 auto const sttx = *transaction->getSTransaction();
1273 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1274 {
1275 transaction->setStatus(INVALID);
1276 transaction->setResult(temINVALID_FLAG);
1277 app_.getHashRouter().setFlags(
1278 transaction->getID(), HashRouterFlags::BAD);
1279 return false;
1280 }
1281
1282 // NOTE eahennis - I think this check is redundant,
1283 // but I'm not 100% sure yet.
1284 // If so, only cost is looking up HashRouter flags.
1285 auto const [validity, reason] =
1286 checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1287 XRPL_ASSERT(
1288 validity == Validity::Valid,
1289 "ripple::NetworkOPsImp::processTransaction : valid validity");
1290
1291 // Not concerned with local checks at this point.
1292 if (validity == Validity::SigBad)
1293 {
1294 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1295 transaction->setStatus(INVALID);
1296 transaction->setResult(temBAD_SIGNATURE);
1297 app_.getHashRouter().setFlags(
1298 transaction->getID(), HashRouterFlags::BAD);
1299 return false;
1300 }
1301
1302 // canonicalize can change our pointer
1303 app_.getMasterTransaction().canonicalize(&transaction);
1304
1305 return true;
1306}
1307
1308void
1309NetworkOPsImp::processTransaction(
1310 std::shared_ptr<Transaction>& transaction,
1311 bool bUnlimited,
1312 bool bLocal,
1313 FailHard failType)
1314{
1315 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1316
1317 // preProcessTransaction can change our pointer
1318 if (!preProcessTransaction(transaction))
1319 return;
1320
1321 if (bLocal)
1322 doTransactionSync(transaction, bUnlimited, failType);
1323 else
1324 doTransactionAsync(transaction, bUnlimited, failType);
1325}
1326
1327void
1328NetworkOPsImp::doTransactionAsync(
1329 std::shared_ptr<Transaction> transaction,
1330 bool bUnlimited,
1331 FailHard failType)
1332{
1333 std::lock_guard lock(mMutex);
1334
1335 if (transaction->getApplying())
1336 return;
1337
1338 mTransactions.push_back(
1339 TransactionStatus(transaction, bUnlimited, false, failType));
1340 transaction->setApplying();
1341
1342 if (mDispatchState == DispatchState::none)
1343 {
1344 if (m_job_queue.addJob(
1345 jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
1346 {
1347 mDispatchState = DispatchState::scheduled;
1348 }
1349 }
1350}
1351
1352void
1353NetworkOPsImp::doTransactionSync(
1354 std::shared_ptr<Transaction> transaction,
1355 bool bUnlimited,
1356 FailHard failType)
1357{
1358 std::unique_lock<std::mutex> lock(mMutex);
1359
1360 if (!transaction->getApplying())
1361 {
1362 mTransactions.push_back(
1363 TransactionStatus(transaction, bUnlimited, true, failType));
1364 transaction->setApplying();
1365 }
1366
1367 doTransactionSyncBatch(
1368 lock, [&transaction](std::unique_lock<std::mutex> const&) {
1369 return transaction->getApplying();
1370 });
1371}
1372
1373void
1374NetworkOPsImp::doTransactionSyncBatch(
1376 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1377{
1378 do
1379 {
1380 if (mDispatchState == DispatchState::running)
1381 {
1382 // A batch processing job is already running, so wait.
1383 mCond.wait(lock);
1384 }
1385 else
1386 {
1387 apply(lock);
1388
1389 if (mTransactions.size())
1390 {
1391 // More transactions need to be applied, but by another job.
1392 if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
1393 transactionBatch();
1394 }))
1395 {
1396 mDispatchState = DispatchState::scheduled;
1397 }
1398 }
1399 }
1400 } while (retryCallback(lock));
1401}
1402
1403void
1404NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1405{
1406 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1408 candidates.reserve(set.size());
1409 for (auto const& [_, tx] : set)
1410 {
1411 std::string reason;
1412 auto transaction = std::make_shared<Transaction>(tx, reason, app_);
1413
1414 if (transaction->getStatus() == INVALID)
1415 {
1416 if (!reason.empty())
1417 {
1418 JLOG(m_journal.trace())
1419 << "Exception checking transaction: " << reason;
1420 }
1421 app_.getHashRouter().setFlags(
1422 tx->getTransactionID(), HashRouterFlags::BAD);
1423 continue;
1424 }
1425
1426 // preProcessTransaction can change our pointer
1427 if (!preProcessTransaction(transaction))
1428 continue;
1429
1430 candidates.emplace_back(transaction);
1431 }
1432
1433 std::vector<TransactionStatus> transactions;
1434 transactions.reserve(candidates.size());
1435
1436 std::unique_lock lock(mMutex);
1437
1438 for (auto& transaction : candidates)
1439 {
1440 if (!transaction->getApplying())
1441 {
1442 transactions.emplace_back(transaction, false, false, FailHard::no);
1443 transaction->setApplying();
1444 }
1445 }
1446
1447 if (mTransactions.empty())
1448 mTransactions.swap(transactions);
1449 else
1450 {
1451 mTransactions.reserve(mTransactions.size() + transactions.size());
1452 for (auto& t : transactions)
1453 mTransactions.push_back(std::move(t));
1454 }
1455 if (mTransactions.empty())
1456 {
1457 JLOG(m_journal.debug()) << "No transaction to process!";
1458 return;
1459 }
1460
1461 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1462 XRPL_ASSERT(
1463 lock.owns_lock(),
1464 "ripple::NetworkOPsImp::processTransactionSet has lock");
1465 return std::any_of(
1466 mTransactions.begin(), mTransactions.end(), [](auto const& t) {
1467 return t.transaction->getApplying();
1468 });
1469 });
1470}
1471
1472void
1473NetworkOPsImp::transactionBatch()
1474{
1475 std::unique_lock<std::mutex> lock(mMutex);
1476
1477 if (mDispatchState == DispatchState::running)
1478 return;
1479
1480 while (mTransactions.size())
1481 {
1482 apply(lock);
1483 }
1484}
1485
1486void
1487NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1488{
1490 std::vector<TransactionStatus> transactions;
1491 mTransactions.swap(transactions);
1492 XRPL_ASSERT(
1493 !transactions.empty(),
1494 "ripple::NetworkOPsImp::apply : non-empty transactions");
1495 XRPL_ASSERT(
1496 mDispatchState != DispatchState::running,
1497 "ripple::NetworkOPsImp::apply : is not running");
1498
1499 mDispatchState = DispatchState::running;
1500
1501 batchLock.unlock();
1502
1503 {
1504 std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
1505 bool changed = false;
1506 {
1507 std::unique_lock ledgerLock{
1508 m_ledgerMaster.peekMutex(), std::defer_lock};
1509 std::lock(masterLock, ledgerLock);
1510
1511 app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
1512 for (TransactionStatus& e : transactions)
1513 {
1514 // we check before adding to the batch
1515 ApplyFlags flags = tapNONE;
1516 if (e.admin)
1517 flags |= tapUNLIMITED;
1518
1519 if (e.failType == FailHard::yes)
1520 flags |= tapFAIL_HARD;
1521
1522 auto const result = app_.getTxQ().apply(
1523 app_, view, e.transaction->getSTransaction(), flags, j);
1524 e.result = result.ter;
1525 e.applied = result.applied;
1526 changed = changed || result.applied;
1527 }
1528 return changed;
1529 });
1530 }
1531 if (changed)
1532 reportFeeChange();
1533
1534 std::optional<LedgerIndex> validatedLedgerIndex;
1535 if (auto const l = m_ledgerMaster.getValidatedLedger())
1536 validatedLedgerIndex = l->info().seq;
1537
1538 auto newOL = app_.openLedger().current();
1539 for (TransactionStatus& e : transactions)
1540 {
1541 e.transaction->clearSubmitResult();
1542
1543 if (e.applied)
1544 {
1545 pubProposedTransaction(
1546 newOL, e.transaction->getSTransaction(), e.result);
1547 e.transaction->setApplied();
1548 }
1549
1550 e.transaction->setResult(e.result);
1551
1552 if (isTemMalformed(e.result))
1553 app_.getHashRouter().setFlags(
1554 e.transaction->getID(), HashRouterFlags::BAD);
1555
1556#ifdef DEBUG
1557 if (e.result != tesSUCCESS)
1558 {
1559 std::string token, human;
1560
1561 if (transResultInfo(e.result, token, human))
1562 {
1563 JLOG(m_journal.info())
1564 << "TransactionResult: " << token << ": " << human;
1565 }
1566 }
1567#endif
1568
1569 bool addLocal = e.local;
1570
1571 if (e.result == tesSUCCESS)
1572 {
1573 JLOG(m_journal.debug())
1574 << "Transaction is now included in open ledger";
1575 e.transaction->setStatus(INCLUDED);
1576
1577 // Pop as many "reasonable" transactions for this account as
1578 // possible. "Reasonable" means they have sequential sequence
1579 // numbers, or use tickets.
1580 auto const& txCur = e.transaction->getSTransaction();
1581
1582 std::size_t count = 0;
1583 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1584 txNext && count < maxPoppedTransactions;
1585 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1586 {
1587 if (!batchLock.owns_lock())
1588 batchLock.lock();
1589 std::string reason;
1590 auto const trans = sterilize(*txNext);
1591 auto t = std::make_shared<Transaction>(trans, reason, app_);
1592 if (t->getApplying())
1593 break;
1594 submit_held.emplace_back(t, false, false, FailHard::no);
1595 t->setApplying();
1596 }
1597 if (batchLock.owns_lock())
1598 batchLock.unlock();
1599 }
1600 else if (e.result == tefPAST_SEQ)
1601 {
1602 // duplicate or conflict
1603 JLOG(m_journal.info()) << "Transaction is obsolete";
1604 e.transaction->setStatus(OBSOLETE);
1605 }
1606 else if (e.result == terQUEUED)
1607 {
1608 JLOG(m_journal.debug())
1609 << "Transaction is likely to claim a"
1610 << " fee, but is queued until fee drops";
1611
1612 e.transaction->setStatus(HELD);
1613 // Add to held transactions, because it could get
1614 // kicked out of the queue, and this will try to
1615 // put it back.
1616 m_ledgerMaster.addHeldTransaction(e.transaction);
1617 e.transaction->setQueued();
1618 e.transaction->setKept();
1619 }
1620 else if (
1621 isTerRetry(e.result) || isTelLocal(e.result) ||
1622 isTefFailure(e.result))
1623 {
1624 if (e.failType != FailHard::yes)
1625 {
1626 auto const lastLedgerSeq =
1627 e.transaction->getSTransaction()->at(
1628 ~sfLastLedgerSequence);
1629 auto const ledgersLeft = lastLedgerSeq
1630 ? *lastLedgerSeq -
1631 m_ledgerMaster.getCurrentLedgerIndex()
1633 // If any of these conditions are met, the transaction can
1634 // be held:
1635 // 1. It was submitted locally. (Note that this flag is only
1636 // true on the initial submission.)
1637 // 2. The transaction has a LastLedgerSequence, and the
1638 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1639 // (5) ledgers into the future. (Remember that an
1640 // unseated optional compares as less than all seated
1641 // values, so it has to be checked explicitly first.)
1642 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1643 // (setFlags
1644 // checks before setting. If the flag is set, it returns
1645 // false, which means it's been held once without one of
1646 // the other conditions, so don't hold it again. Time's
1647 // up!)
1648 //
1649 if (e.local ||
1650 (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1651 app_.getHashRouter().setFlags(
1652 e.transaction->getID(), HashRouterFlags::HELD))
1653 {
1654 // transaction should be held
1655 JLOG(m_journal.debug())
1656 << "Transaction should be held: " << e.result;
1657 e.transaction->setStatus(HELD);
1658 m_ledgerMaster.addHeldTransaction(e.transaction);
1659 e.transaction->setKept();
1660 }
1661 else
1662 JLOG(m_journal.debug())
1663 << "Not holding transaction "
1664 << e.transaction->getID() << ": "
1665 << (e.local ? "local" : "network") << ", "
1666 << "result: " << e.result << " ledgers left: "
1667 << (ledgersLeft ? to_string(*ledgersLeft)
1668 : "unspecified");
1669 }
1670 }
1671 else
1672 {
1673 JLOG(m_journal.debug())
1674 << "Status other than success " << e.result;
1675 e.transaction->setStatus(INVALID);
1676 }
1677
1678 auto const enforceFailHard =
1679 e.failType == FailHard::yes && !isTesSuccess(e.result);
1680
1681 if (addLocal && !enforceFailHard)
1682 {
1683 m_localTX->push_back(
1684 m_ledgerMaster.getCurrentLedgerIndex(),
1685 e.transaction->getSTransaction());
1686 e.transaction->setKept();
1687 }
1688
1689 if ((e.applied ||
1690 ((mMode != OperatingMode::FULL) &&
1691 (e.failType != FailHard::yes) && e.local) ||
1692 (e.result == terQUEUED)) &&
1693 !enforceFailHard)
1694 {
1695 auto const toSkip =
1696 app_.getHashRouter().shouldRelay(e.transaction->getID());
1697 if (auto const sttx = *(e.transaction->getSTransaction());
1698 toSkip &&
1699 // Skip relaying if it's an inner batch txn and batch
1700 // feature is enabled
1701 !(sttx.isFlag(tfInnerBatchTxn) &&
1702 newOL->rules().enabled(featureBatch)))
1703 {
1704 protocol::TMTransaction tx;
1705 Serializer s;
1706
1707 sttx.add(s);
1708 tx.set_rawtransaction(s.data(), s.size());
1709 tx.set_status(protocol::tsCURRENT);
1710 tx.set_receivetimestamp(
1711 app_.timeKeeper().now().time_since_epoch().count());
1712 tx.set_deferred(e.result == terQUEUED);
1713 // FIXME: This should be when we received it
1714 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1715 e.transaction->setBroadcast();
1716 }
1717 }
1718
1719 if (validatedLedgerIndex)
1720 {
1721 auto [fee, accountSeq, availableSeq] =
1722 app_.getTxQ().getTxRequiredFeeAndSeq(
1723 *newOL, e.transaction->getSTransaction());
1724 e.transaction->setCurrentLedgerState(
1725 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1726 }
1727 }
1728 }
1729
1730 batchLock.lock();
1731
1732 for (TransactionStatus& e : transactions)
1733 e.transaction->clearApplying();
1734
1735 if (!submit_held.empty())
1736 {
1737 if (mTransactions.empty())
1738 mTransactions.swap(submit_held);
1739 else
1740 {
1741 mTransactions.reserve(mTransactions.size() + submit_held.size());
1742 for (auto& e : submit_held)
1743 mTransactions.push_back(std::move(e));
1744 }
1745 }
1746
1747 mCond.notify_all();
1748
1749 mDispatchState = DispatchState::none;
1750}
1751
1752//
1753// Owner functions
1754//
1755
1757NetworkOPsImp::getOwnerInfo(
1759 AccountID const& account)
1760{
1761 Json::Value jvObjects(Json::objectValue);
1762 auto root = keylet::ownerDir(account);
1763 auto sleNode = lpLedger->read(keylet::page(root));
1764 if (sleNode)
1765 {
1766 std::uint64_t uNodeDir;
1767
1768 do
1769 {
1770 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1771 {
1772 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1773 XRPL_ASSERT(
1774 sleCur,
1775 "ripple::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1776
1777 switch (sleCur->getType())
1778 {
1779 case ltOFFER:
1780 if (!jvObjects.isMember(jss::offers))
1781 jvObjects[jss::offers] =
1783
1784 jvObjects[jss::offers].append(
1785 sleCur->getJson(JsonOptions::none));
1786 break;
1787
1788 case ltRIPPLE_STATE:
1789 if (!jvObjects.isMember(jss::ripple_lines))
1790 {
1791 jvObjects[jss::ripple_lines] =
1793 }
1794
1795 jvObjects[jss::ripple_lines].append(
1796 sleCur->getJson(JsonOptions::none));
1797 break;
1798
1799 case ltACCOUNT_ROOT:
1800 case ltDIR_NODE:
1801 // LCOV_EXCL_START
1802 default:
1803 UNREACHABLE(
1804 "ripple::NetworkOPsImp::getOwnerInfo : invalid "
1805 "type");
1806 break;
1807 // LCOV_EXCL_STOP
1808 }
1809 }
1810
1811 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1812
1813 if (uNodeDir)
1814 {
1815 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1816 XRPL_ASSERT(
1817 sleNode,
1818 "ripple::NetworkOPsImp::getOwnerInfo : read next page");
1819 }
1820 } while (uNodeDir);
1821 }
1822
1823 return jvObjects;
1824}
1825
1826//
1827// Other
1828//
1829
1830inline bool
1831NetworkOPsImp::isBlocked()
1832{
1833 return isAmendmentBlocked() || isUNLBlocked();
1834}
1835
1836inline bool
1837NetworkOPsImp::isAmendmentBlocked()
1838{
1839 return amendmentBlocked_;
1840}
1841
1842void
1843NetworkOPsImp::setAmendmentBlocked()
1844{
1845 amendmentBlocked_ = true;
1846 setMode(OperatingMode::CONNECTED);
1847}
1848
1849inline bool
1850NetworkOPsImp::isAmendmentWarned()
1851{
1852 return !amendmentBlocked_ && amendmentWarned_;
1853}
1854
1855inline void
1856NetworkOPsImp::setAmendmentWarned()
1857{
1858 amendmentWarned_ = true;
1859}
1860
1861inline void
1862NetworkOPsImp::clearAmendmentWarned()
1863{
1864 amendmentWarned_ = false;
1865}
1866
1867inline bool
1868NetworkOPsImp::isUNLBlocked()
1869{
1870 return unlBlocked_;
1871}
1872
1873void
1874NetworkOPsImp::setUNLBlocked()
1875{
1876 unlBlocked_ = true;
1877 setMode(OperatingMode::CONNECTED);
1878}
1879
1880inline void
1881NetworkOPsImp::clearUNLBlocked()
1882{
1883 unlBlocked_ = false;
1884}
1885
1886bool
1887NetworkOPsImp::checkLastClosedLedger(
1888 Overlay::PeerSequence const& peerList,
1889 uint256& networkClosed)
1890{
1891 // Returns true if there's an *abnormal* ledger issue, normal changing in
1892 // TRACKING mode should return false. Do we have sufficient validations for
1893 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1894 // better ledger available? If so, we are either tracking or full.
1895
1896 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1897
1898 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1899
1900 if (!ourClosed)
1901 return false;
1902
1903 uint256 closedLedger = ourClosed->info().hash;
1904 uint256 prevClosedLedger = ourClosed->info().parentHash;
1905 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1906 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1907
1908 //-------------------------------------------------------------------------
1909 // Determine preferred last closed ledger
1910
1911 auto& validations = app_.getValidations();
1912 JLOG(m_journal.debug())
1913 << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1914
1915 // Will rely on peer LCL if no trusted validations exist
1917 peerCounts[closedLedger] = 0;
1918 if (mMode >= OperatingMode::TRACKING)
1919 peerCounts[closedLedger]++;
1920
1921 for (auto& peer : peerList)
1922 {
1923 uint256 peerLedger = peer->getClosedLedgerHash();
1924
1925 if (peerLedger.isNonZero())
1926 ++peerCounts[peerLedger];
1927 }
1928
1929 for (auto const& it : peerCounts)
1930 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1931
1932 uint256 preferredLCL = validations.getPreferredLCL(
1933 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1934 m_ledgerMaster.getValidLedgerIndex(),
1935 peerCounts);
1936
1937 bool switchLedgers = preferredLCL != closedLedger;
1938 if (switchLedgers)
1939 closedLedger = preferredLCL;
1940 //-------------------------------------------------------------------------
1941 if (switchLedgers && (closedLedger == prevClosedLedger))
1942 {
1943 // don't switch to our own previous ledger
1944 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1945 networkClosed = ourClosed->info().hash;
1946 switchLedgers = false;
1947 }
1948 else
1949 networkClosed = closedLedger;
1950
1951 if (!switchLedgers)
1952 return false;
1953
1954 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1955
1956 if (!consensus)
1957 consensus = app_.getInboundLedgers().acquire(
1958 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1959
1960 if (consensus &&
1961 (!m_ledgerMaster.canBeCurrent(consensus) ||
1962 !m_ledgerMaster.isCompatible(
1963 *consensus, m_journal.debug(), "Not switching")))
1964 {
1965 // Don't switch to a ledger not on the validated chain
1966 // or with an invalid close time or sequence
1967 networkClosed = ourClosed->info().hash;
1968 return false;
1969 }
1970
1971 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1972 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash
1973 << getJson({*ourClosed, {}});
1974 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1975
1976 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1977 {
1978 setMode(OperatingMode::CONNECTED);
1979 }
1980
1981 if (consensus)
1982 {
1983 // FIXME: If this rewinds the ledger sequence, or has the same
1984 // sequence, we should update the status on any stored transactions
1985 // in the invalidated ledgers.
1986 switchLastClosedLedger(consensus);
1987 }
1988
1989 return true;
1990}
1991
1992void
1993NetworkOPsImp::switchLastClosedLedger(
1994 std::shared_ptr<Ledger const> const& newLCL)
1995{
1996 // set the newLCL as our last closed ledger -- this is abnormal code
1997 JLOG(m_journal.error())
1998 << "JUMP last closed ledger to " << newLCL->info().hash;
1999
2000 clearNeedNetworkLedger();
2001
2002 // Update fee computations.
2003 app_.getTxQ().processClosedLedger(app_, *newLCL, true);
2004
2005 // Caller must own master lock
2006 {
2007 // Apply tx in old open ledger to new
2008 // open ledger. Then apply local tx.
2009
2010 auto retries = m_localTX->getTxSet();
2011 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
2013 if (lastVal)
2014 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
2015 else
2016 rules.emplace(app_.config().features);
2017 app_.openLedger().accept(
2018 app_,
2019 *rules,
2020 newLCL,
2021 OrderedTxs({}),
2022 false,
2023 retries,
2024 tapNONE,
2025 "jump",
2026 [&](OpenView& view, beast::Journal j) {
2027 // Stuff the ledger with transactions from the queue.
2028 return app_.getTxQ().accept(app_, view);
2029 });
2030 }
2031
2032 m_ledgerMaster.switchLCL(newLCL);
2033
2034 protocol::TMStatusChange s;
2035 s.set_newevent(protocol::neSWITCHED_LEDGER);
2036 s.set_ledgerseq(newLCL->info().seq);
2037 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2038 s.set_ledgerhashprevious(
2039 newLCL->info().parentHash.begin(), newLCL->info().parentHash.size());
2040 s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size());
2041
2042 app_.overlay().foreach(
2043 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
2044}
2045
2046bool
2047NetworkOPsImp::beginConsensus(
2048 uint256 const& networkClosed,
2050{
2051 XRPL_ASSERT(
2052 networkClosed.isNonZero(),
2053 "ripple::NetworkOPsImp::beginConsensus : nonzero input");
2054
2055 auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
2056
2057 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
2058 << " with LCL " << closingInfo.parentHash;
2059
2060 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
2061
2062 if (!prevLedger)
2063 {
2064 // this shouldn't happen unless we jump ledgers
2065 if (mMode == OperatingMode::FULL)
2066 {
2067 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
2068 setMode(OperatingMode::TRACKING);
2069 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
2070 }
2071
2072 CLOG(clog) << "beginConsensus no previous ledger. ";
2073 return false;
2074 }
2075
2076 XRPL_ASSERT(
2077 prevLedger->info().hash == closingInfo.parentHash,
2078 "ripple::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2079 "parent");
2080 XRPL_ASSERT(
2081 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->info().hash,
2082 "ripple::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2083 "hash");
2084
2085 if (prevLedger->rules().enabled(featureNegativeUNL))
2086 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
2087 TrustChanges const changes = app_.validators().updateTrusted(
2088 app_.getValidations().getCurrentNodeIDs(),
2089 closingInfo.parentCloseTime,
2090 *this,
2091 app_.overlay(),
2092 app_.getHashRouter());
2093
2094 if (!changes.added.empty() || !changes.removed.empty())
2095 {
2096 app_.getValidations().trustChanged(changes.added, changes.removed);
2097 // Update the AmendmentTable so it tracks the current validators.
2098 app_.getAmendmentTable().trustChanged(
2099 app_.validators().getQuorumKeys().second);
2100 }
2101
2102 mConsensus.startRound(
2103 app_.timeKeeper().closeTime(),
2104 networkClosed,
2105 prevLedger,
2106 changes.removed,
2107 changes.added,
2108 clog);
2109
2110 ConsensusPhase const currPhase = mConsensus.phase();
2111 if (mLastConsensusPhase != currPhase)
2112 {
2113 reportConsensusStateChange(currPhase);
2114 mLastConsensusPhase = currPhase;
2115 }
2116
2117 JLOG(m_journal.debug()) << "Initiating consensus engine";
2118 return true;
2119}
2120
2121bool
2122NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
2123{
2124 auto const& peerKey = peerPos.publicKey();
2125 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2126 {
2127 // Could indicate a operator misconfiguration where two nodes are
2128 // running with the same validator key configured, so this isn't fatal,
2129 // and it doesn't necessarily indicate peer misbehavior. But since this
2130 // is a trusted message, it could be a very big deal. Either way, we
2131 // don't want to relay the proposal. Note that the byzantine behavior
2132 // detection in handleNewValidation will notify other peers.
2133 //
2134 // Another, innocuous explanation is unusual message routing and delays,
2135 // causing this node to receive its own messages back.
2136 JLOG(m_journal.error())
2137 << "Received a proposal signed by MY KEY from a peer. This may "
2138 "indicate a misconfiguration where another node has the same "
2139 "validator key, or may be caused by unusual message routing and "
2140 "delays.";
2141 return false;
2142 }
2143
2144 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
2145}
2146
2147void
2148NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
2149{
2150 // We now have an additional transaction set
2151 // either created locally during the consensus process
2152 // or acquired from a peer
2153
2154 // Inform peers we have this set
2155 protocol::TMHaveTransactionSet msg;
2156 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2157 msg.set_status(protocol::tsHAVE);
2158 app_.overlay().foreach(
2159 send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
2160
2161 // We acquired it because consensus asked us to
2162 if (fromAcquire)
2163 mConsensus.gotTxSet(app_.timeKeeper().closeTime(), RCLTxSet{map});
2164}
2165
2166void
2167NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
2168{
2169 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->info().parentHash;
2170
2171 for (auto const& it : app_.overlay().getActivePeers())
2172 {
2173 if (it && (it->getClosedLedgerHash() == deadLedger))
2174 {
2175 JLOG(m_journal.trace()) << "Killing obsolete peer status";
2176 it->cycleStatus();
2177 }
2178 }
2179
2180 uint256 networkClosed;
2181 bool ledgerChange =
2182 checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
2183
2184 if (networkClosed.isZero())
2185 {
2186 CLOG(clog) << "endConsensus last closed ledger is zero. ";
2187 return;
2188 }
2189
2190 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
2191 // we must count how many nodes share our LCL, how many nodes disagree with
2192 // our LCL, and how many validations our LCL has. We also want to check
2193 // timing to make sure there shouldn't be a newer LCL. We need this
2194 // information to do the next three tests.
2195
2196 if (((mMode == OperatingMode::CONNECTED) ||
2197 (mMode == OperatingMode::SYNCING)) &&
2198 !ledgerChange)
2199 {
2200 // Count number of peers that agree with us and UNL nodes whose
2201 // validations we have for LCL. If the ledger is good enough, go to
2202 // TRACKING - TODO
2203 if (!needNetworkLedger_)
2204 setMode(OperatingMode::TRACKING);
2205 }
2206
2207 if (((mMode == OperatingMode::CONNECTED) ||
2208 (mMode == OperatingMode::TRACKING)) &&
2209 !ledgerChange)
2210 {
2211 // check if the ledger is good enough to go to FULL
2212 // Note: Do not go to FULL if we don't have the previous ledger
2213 // check if the ledger is bad enough to go to CONNECTE D -- TODO
2214 auto current = m_ledgerMaster.getCurrentLedger();
2215 if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2216 2 * current->info().closeTimeResolution))
2217 {
2218 setMode(OperatingMode::FULL);
2219 }
2220 }
2221
2222 beginConsensus(networkClosed, clog);
2223}
2224
2225void
2226NetworkOPsImp::consensusViewChange()
2227{
2228 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2229 {
2230 setMode(OperatingMode::CONNECTED);
2231 }
2232}
2233
2234void
2235NetworkOPsImp::pubManifest(Manifest const& mo)
2236{
2237 // VFALCO consider std::shared_mutex
2238 std::lock_guard sl(mSubLock);
2239
2240 if (!mStreamMaps[sManifests].empty())
2241 {
2243
2244 jvObj[jss::type] = "manifestReceived";
2245 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2246 if (mo.signingKey)
2247 jvObj[jss::signing_key] =
2248 toBase58(TokenType::NodePublic, *mo.signingKey);
2249 jvObj[jss::seq] = Json::UInt(mo.sequence);
2250 if (auto sig = mo.getSignature())
2251 jvObj[jss::signature] = strHex(*sig);
2252 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2253 if (!mo.domain.empty())
2254 jvObj[jss::domain] = mo.domain;
2255 jvObj[jss::manifest] = strHex(mo.serialized);
2256
2257 for (auto i = mStreamMaps[sManifests].begin();
2258 i != mStreamMaps[sManifests].end();)
2259 {
2260 if (auto p = i->second.lock())
2261 {
2262 p->send(jvObj, true);
2263 ++i;
2264 }
2265 else
2266 {
2267 i = mStreamMaps[sManifests].erase(i);
2268 }
2269 }
2270 }
2271}
2272
2273NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2274 XRPAmount fee,
2275 TxQ::Metrics&& escalationMetrics,
2276 LoadFeeTrack const& loadFeeTrack)
2277 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2278 , loadBaseServer{loadFeeTrack.getLoadBase()}
2279 , baseFee{fee}
2280 , em{std::move(escalationMetrics)}
2281{
2282}
2283
2284bool
2286 NetworkOPsImp::ServerFeeSummary const& b) const
2287{
2288 if (loadFactorServer != b.loadFactorServer ||
2289 loadBaseServer != b.loadBaseServer || baseFee != b.baseFee ||
2290 em.has_value() != b.em.has_value())
2291 return true;
2292
2293 if (em && b.em)
2294 {
2295 return (
2296 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2297 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel ||
2298 em->referenceFeeLevel != b.em->referenceFeeLevel);
2299 }
2300
2301 return false;
2302}
2303
2304// Need to cap to uint64 to uint32 due to JSON limitations
2305static std::uint32_t
2307{
2309
2310 return std::min(max32, v);
2311};
2312
2313void
2315{
2316 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2317 // list into a local array while holding the lock then release
2318 // the lock and call send on everyone.
2319 //
2321
2322 if (!mStreamMaps[sServer].empty())
2323 {
2325
2327 app_.openLedger().current()->fees().base,
2329 app_.getFeeTrack()};
2330
2331 jvObj[jss::type] = "serverStatus";
2332 jvObj[jss::server_status] = strOperatingMode();
2333 jvObj[jss::load_base] = f.loadBaseServer;
2334 jvObj[jss::load_factor_server] = f.loadFactorServer;
2335 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2336
2337 if (f.em)
2338 {
2339 auto const loadFactor = std::max(
2340 safe_cast<std::uint64_t>(f.loadFactorServer),
2341 mulDiv(
2342 f.em->openLedgerFeeLevel,
2343 f.loadBaseServer,
2344 f.em->referenceFeeLevel)
2346
2347 jvObj[jss::load_factor] = trunc32(loadFactor);
2348 jvObj[jss::load_factor_fee_escalation] =
2349 f.em->openLedgerFeeLevel.jsonClipped();
2350 jvObj[jss::load_factor_fee_queue] =
2351 f.em->minProcessingFeeLevel.jsonClipped();
2352 jvObj[jss::load_factor_fee_reference] =
2353 f.em->referenceFeeLevel.jsonClipped();
2354 }
2355 else
2356 jvObj[jss::load_factor] = f.loadFactorServer;
2357
2358 mLastFeeSummary = f;
2359
2360 for (auto i = mStreamMaps[sServer].begin();
2361 i != mStreamMaps[sServer].end();)
2362 {
2363 InfoSub::pointer p = i->second.lock();
2364
2365 // VFALCO TODO research the possibility of using thread queues and
2366 // linearizing the deletion of subscribers with the
2367 // sending of JSON data.
2368 if (p)
2369 {
2370 p->send(jvObj, true);
2371 ++i;
2372 }
2373 else
2374 {
2375 i = mStreamMaps[sServer].erase(i);
2376 }
2377 }
2378 }
2379}
2380
2381void
2383{
2385
2386 auto& streamMap = mStreamMaps[sConsensusPhase];
2387 if (!streamMap.empty())
2388 {
2390 jvObj[jss::type] = "consensusPhase";
2391 jvObj[jss::consensus] = to_string(phase);
2392
2393 for (auto i = streamMap.begin(); i != streamMap.end();)
2394 {
2395 if (auto p = i->second.lock())
2396 {
2397 p->send(jvObj, true);
2398 ++i;
2399 }
2400 else
2401 {
2402 i = streamMap.erase(i);
2403 }
2404 }
2405 }
2406}
2407
2408void
2410{
2411 // VFALCO consider std::shared_mutex
2413
2414 if (!mStreamMaps[sValidations].empty())
2415 {
2417
2418 auto const signerPublic = val->getSignerPublic();
2419
2420 jvObj[jss::type] = "validationReceived";
2421 jvObj[jss::validation_public_key] =
2422 toBase58(TokenType::NodePublic, signerPublic);
2423 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2424 jvObj[jss::signature] = strHex(val->getSignature());
2425 jvObj[jss::full] = val->isFull();
2426 jvObj[jss::flags] = val->getFlags();
2427 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2428 jvObj[jss::data] = strHex(val->getSerializer().slice());
2429 jvObj[jss::network_id] = app_.config().NETWORK_ID;
2430
2431 if (auto version = (*val)[~sfServerVersion])
2432 jvObj[jss::server_version] = std::to_string(*version);
2433
2434 if (auto cookie = (*val)[~sfCookie])
2435 jvObj[jss::cookie] = std::to_string(*cookie);
2436
2437 if (auto hash = (*val)[~sfValidatedHash])
2438 jvObj[jss::validated_hash] = strHex(*hash);
2439
2440 auto const masterKey =
2441 app_.validatorManifests().getMasterKey(signerPublic);
2442
2443 if (masterKey != signerPublic)
2444 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2445
2446 // NOTE *seq is a number, but old API versions used string. We replace
2447 // number with a string using MultiApiJson near end of this function
2448 if (auto const seq = (*val)[~sfLedgerSequence])
2449 jvObj[jss::ledger_index] = *seq;
2450
2451 if (val->isFieldPresent(sfAmendments))
2452 {
2453 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2454 for (auto const& amendment : val->getFieldV256(sfAmendments))
2455 jvObj[jss::amendments].append(to_string(amendment));
2456 }
2457
2458 if (auto const closeTime = (*val)[~sfCloseTime])
2459 jvObj[jss::close_time] = *closeTime;
2460
2461 if (auto const loadFee = (*val)[~sfLoadFee])
2462 jvObj[jss::load_fee] = *loadFee;
2463
2464 if (auto const baseFee = val->at(~sfBaseFee))
2465 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2466
2467 if (auto const reserveBase = val->at(~sfReserveBase))
2468 jvObj[jss::reserve_base] = *reserveBase;
2469
2470 if (auto const reserveInc = val->at(~sfReserveIncrement))
2471 jvObj[jss::reserve_inc] = *reserveInc;
2472
2473 // (The ~ operator converts the Proxy to a std::optional, which
2474 // simplifies later operations)
2475 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops);
2476 baseFeeXRP && baseFeeXRP->native())
2477 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2478
2479 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2480 reserveBaseXRP && reserveBaseXRP->native())
2481 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2482
2483 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2484 reserveIncXRP && reserveIncXRP->native())
2485 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2486
2487 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2488 // for consumers supporting different API versions
2489 MultiApiJson multiObj{jvObj};
2490 multiObj.visit(
2491 RPC::apiVersion<1>, //
2492 [](Json::Value& jvTx) {
2493 // Type conversion for older API versions to string
2494 if (jvTx.isMember(jss::ledger_index))
2495 {
2496 jvTx[jss::ledger_index] =
2497 std::to_string(jvTx[jss::ledger_index].asUInt());
2498 }
2499 });
2500
2501 for (auto i = mStreamMaps[sValidations].begin();
2502 i != mStreamMaps[sValidations].end();)
2503 {
2504 if (auto p = i->second.lock())
2505 {
2506 multiObj.visit(
2507 p->getApiVersion(), //
2508 [&](Json::Value const& jv) { p->send(jv, true); });
2509 ++i;
2510 }
2511 else
2512 {
2513 i = mStreamMaps[sValidations].erase(i);
2514 }
2515 }
2516 }
2517}
2518
2519void
2521{
2523
2524 if (!mStreamMaps[sPeerStatus].empty())
2525 {
2526 Json::Value jvObj(func());
2527
2528 jvObj[jss::type] = "peerStatusChange";
2529
2530 for (auto i = mStreamMaps[sPeerStatus].begin();
2531 i != mStreamMaps[sPeerStatus].end();)
2532 {
2533 InfoSub::pointer p = i->second.lock();
2534
2535 if (p)
2536 {
2537 p->send(jvObj, true);
2538 ++i;
2539 }
2540 else
2541 {
2542 i = mStreamMaps[sPeerStatus].erase(i);
2543 }
2544 }
2545 }
2546}
2547
2548void
2550{
2551 using namespace std::chrono_literals;
2552 if (om == OperatingMode::CONNECTED)
2553 {
2556 }
2557 else if (om == OperatingMode::SYNCING)
2558 {
2561 }
2562
2563 if ((om > OperatingMode::CONNECTED) && isBlocked())
2565
2566 if (mMode == om)
2567 return;
2568
2569 mMode = om;
2570
2571 accounting_.mode(om);
2572
2573 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2574 pubServer();
2575}
2576
2577bool
2580 std::string const& source)
2581{
2582 JLOG(m_journal.trace())
2583 << "recvValidation " << val->getLedgerHash() << " from " << source;
2584
2586 BypassAccept bypassAccept = BypassAccept::no;
2587 try
2588 {
2589 if (pendingValidations_.contains(val->getLedgerHash()))
2590 bypassAccept = BypassAccept::yes;
2591 else
2592 pendingValidations_.insert(val->getLedgerHash());
2593 scope_unlock unlock(lock);
2594 handleNewValidation(app_, val, source, bypassAccept, m_journal);
2595 }
2596 catch (std::exception const& e)
2597 {
2598 JLOG(m_journal.warn())
2599 << "Exception thrown for handling new validation "
2600 << val->getLedgerHash() << ": " << e.what();
2601 }
2602 catch (...)
2603 {
2604 JLOG(m_journal.warn())
2605 << "Unknown exception thrown for handling new validation "
2606 << val->getLedgerHash();
2607 }
2608 if (bypassAccept == BypassAccept::no)
2609 {
2610 pendingValidations_.erase(val->getLedgerHash());
2611 }
2612 lock.unlock();
2613
2614 pubValidation(val);
2615
2616 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2618 ss << "VALIDATION: " << val->render() << " master_key: ";
2619 auto master = app_.validators().getTrustedKey(val->getSignerPublic());
2620 if (master)
2621 {
2622 ss << toBase58(TokenType::NodePublic, *master);
2623 }
2624 else
2625 {
2626 ss << "none";
2627 }
2628 return ss.str();
2629 }();
2630
2631 // We will always relay trusted validations; if configured, we will
2632 // also relay all untrusted validations.
2633 return app_.config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2634}
2635
2638{
2639 return mConsensus.getJson(true);
2640}
2641
2643NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2644{
2646
2647 // System-level warnings
2648 {
2649 Json::Value warnings{Json::arrayValue};
2650 if (isAmendmentBlocked())
2651 {
2652 Json::Value& w = warnings.append(Json::objectValue);
2653 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2654 w[jss::message] =
2655 "This server is amendment blocked, and must be updated to be "
2656 "able to stay in sync with the network.";
2657 }
2658 if (isUNLBlocked())
2659 {
2660 Json::Value& w = warnings.append(Json::objectValue);
2661 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2662 w[jss::message] =
2663 "This server has an expired validator list. validators.txt "
2664 "may be incorrectly configured or some [validator_list_sites] "
2665 "may be unreachable.";
2666 }
2667 if (admin && isAmendmentWarned())
2668 {
2669 Json::Value& w = warnings.append(Json::objectValue);
2670 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2671 w[jss::message] =
2672 "One or more unsupported amendments have reached majority. "
2673 "Upgrade to the latest version before they are activated "
2674 "to avoid being amendment blocked.";
2675 if (auto const expected =
2677 {
2678 auto& d = w[jss::details] = Json::objectValue;
2679 d[jss::expected_date] = expected->time_since_epoch().count();
2680 d[jss::expected_date_UTC] = to_string(*expected);
2681 }
2682 }
2683
2684 if (warnings.size())
2685 info[jss::warnings] = std::move(warnings);
2686 }
2687
2688 // hostid: unique string describing the machine
2689 if (human)
2690 info[jss::hostid] = getHostId(admin);
2691
2692 // domain: if configured with a domain, report it:
2693 if (!app_.config().SERVER_DOMAIN.empty())
2694 info[jss::server_domain] = app_.config().SERVER_DOMAIN;
2695
2696 info[jss::build_version] = BuildInfo::getVersionString();
2697
2698 info[jss::server_state] = strOperatingMode(admin);
2699
2700 info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(
2702
2704 info[jss::network_ledger] = "waiting";
2705
2706 info[jss::validation_quorum] =
2707 static_cast<Json::UInt>(app_.validators().quorum());
2708
2709 if (admin)
2710 {
2711 switch (app_.config().NODE_SIZE)
2712 {
2713 case 0:
2714 info[jss::node_size] = "tiny";
2715 break;
2716 case 1:
2717 info[jss::node_size] = "small";
2718 break;
2719 case 2:
2720 info[jss::node_size] = "medium";
2721 break;
2722 case 3:
2723 info[jss::node_size] = "large";
2724 break;
2725 case 4:
2726 info[jss::node_size] = "huge";
2727 break;
2728 }
2729
2730 auto when = app_.validators().expires();
2731
2732 if (!human)
2733 {
2734 if (when)
2735 info[jss::validator_list_expires] =
2736 safe_cast<Json::UInt>(when->time_since_epoch().count());
2737 else
2738 info[jss::validator_list_expires] = 0;
2739 }
2740 else
2741 {
2742 auto& x = (info[jss::validator_list] = Json::objectValue);
2743
2744 x[jss::count] = static_cast<Json::UInt>(app_.validators().count());
2745
2746 if (when)
2747 {
2748 if (*when == TimeKeeper::time_point::max())
2749 {
2750 x[jss::expiration] = "never";
2751 x[jss::status] = "active";
2752 }
2753 else
2754 {
2755 x[jss::expiration] = to_string(*when);
2756
2757 if (*when > app_.timeKeeper().now())
2758 x[jss::status] = "active";
2759 else
2760 x[jss::status] = "expired";
2761 }
2762 }
2763 else
2764 {
2765 x[jss::status] = "unknown";
2766 x[jss::expiration] = "unknown";
2767 }
2768 }
2769
2770#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2771 {
2772 auto& x = (info[jss::git] = Json::objectValue);
2773#ifdef GIT_COMMIT_HASH
2774 x[jss::hash] = GIT_COMMIT_HASH;
2775#endif
2776#ifdef GIT_BRANCH
2777 x[jss::branch] = GIT_BRANCH;
2778#endif
2779 }
2780#endif
2781 }
2782 info[jss::io_latency_ms] =
2783 static_cast<Json::UInt>(app_.getIOLatency().count());
2784
2785 if (admin)
2786 {
2787 if (auto const localPubKey = app_.validators().localPublicKey();
2788 localPubKey && app_.getValidationPublicKey())
2789 {
2790 info[jss::pubkey_validator] =
2791 toBase58(TokenType::NodePublic, localPubKey.value());
2792 }
2793 else
2794 {
2795 info[jss::pubkey_validator] = "none";
2796 }
2797 }
2798
2799 if (counters)
2800 {
2801 info[jss::counters] = app_.getPerfLog().countersJson();
2802
2803 Json::Value nodestore(Json::objectValue);
2804 app_.getNodeStore().getCountsJson(nodestore);
2805 info[jss::counters][jss::nodestore] = nodestore;
2806 info[jss::current_activities] = app_.getPerfLog().currentJson();
2807 }
2808
2809 info[jss::pubkey_node] =
2811
2812 info[jss::complete_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
2813
2815 info[jss::amendment_blocked] = true;
2816
2817 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2818
2819 if (fp != 0)
2820 info[jss::fetch_pack] = Json::UInt(fp);
2821
2822 info[jss::peers] = Json::UInt(app_.overlay().size());
2823
2824 Json::Value lastClose = Json::objectValue;
2825 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2826
2827 if (human)
2828 {
2829 lastClose[jss::converge_time_s] =
2831 }
2832 else
2833 {
2834 lastClose[jss::converge_time] =
2836 }
2837
2838 info[jss::last_close] = lastClose;
2839
2840 // info[jss::consensus] = mConsensus.getJson();
2841
2842 if (admin)
2843 info[jss::load] = m_job_queue.getJson();
2844
2845 if (auto const netid = app_.overlay().networkID())
2846 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2847
2848 auto const escalationMetrics =
2850
2851 auto const loadFactorServer = app_.getFeeTrack().getLoadFactor();
2852 auto const loadBaseServer = app_.getFeeTrack().getLoadBase();
2853 /* Scale the escalated fee level to unitless "load factor".
2854 In practice, this just strips the units, but it will continue
2855 to work correctly if either base value ever changes. */
2856 auto const loadFactorFeeEscalation =
2857 mulDiv(
2858 escalationMetrics.openLedgerFeeLevel,
2859 loadBaseServer,
2860 escalationMetrics.referenceFeeLevel)
2862
2863 auto const loadFactor = std::max(
2864 safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2865
2866 if (!human)
2867 {
2868 info[jss::load_base] = loadBaseServer;
2869 info[jss::load_factor] = trunc32(loadFactor);
2870 info[jss::load_factor_server] = loadFactorServer;
2871
2872 /* Json::Value doesn't support uint64, so clamp to max
2873 uint32 value. This is mostly theoretical, since there
2874 probably isn't enough extant XRP to drive the factor
2875 that high.
2876 */
2877 info[jss::load_factor_fee_escalation] =
2878 escalationMetrics.openLedgerFeeLevel.jsonClipped();
2879 info[jss::load_factor_fee_queue] =
2880 escalationMetrics.minProcessingFeeLevel.jsonClipped();
2881 info[jss::load_factor_fee_reference] =
2882 escalationMetrics.referenceFeeLevel.jsonClipped();
2883 }
2884 else
2885 {
2886 info[jss::load_factor] =
2887 static_cast<double>(loadFactor) / loadBaseServer;
2888
2889 if (loadFactorServer != loadFactor)
2890 info[jss::load_factor_server] =
2891 static_cast<double>(loadFactorServer) / loadBaseServer;
2892
2893 if (admin)
2894 {
2896 if (fee != loadBaseServer)
2897 info[jss::load_factor_local] =
2898 static_cast<double>(fee) / loadBaseServer;
2899 fee = app_.getFeeTrack().getRemoteFee();
2900 if (fee != loadBaseServer)
2901 info[jss::load_factor_net] =
2902 static_cast<double>(fee) / loadBaseServer;
2903 fee = app_.getFeeTrack().getClusterFee();
2904 if (fee != loadBaseServer)
2905 info[jss::load_factor_cluster] =
2906 static_cast<double>(fee) / loadBaseServer;
2907 }
2908 if (escalationMetrics.openLedgerFeeLevel !=
2909 escalationMetrics.referenceFeeLevel &&
2910 (admin || loadFactorFeeEscalation != loadFactor))
2911 info[jss::load_factor_fee_escalation] =
2912 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2913 escalationMetrics.referenceFeeLevel);
2914 if (escalationMetrics.minProcessingFeeLevel !=
2915 escalationMetrics.referenceFeeLevel)
2916 info[jss::load_factor_fee_queue] =
2917 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2918 escalationMetrics.referenceFeeLevel);
2919 }
2920
2921 bool valid = false;
2922 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2923
2924 if (lpClosed)
2925 valid = true;
2926 else
2927 lpClosed = m_ledgerMaster.getClosedLedger();
2928
2929 if (lpClosed)
2930 {
2931 XRPAmount const baseFee = lpClosed->fees().base;
2933 l[jss::seq] = Json::UInt(lpClosed->info().seq);
2934 l[jss::hash] = to_string(lpClosed->info().hash);
2935
2936 if (!human)
2937 {
2938 l[jss::base_fee] = baseFee.jsonClipped();
2939 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2940 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2941 l[jss::close_time] = Json::Value::UInt(
2942 lpClosed->info().closeTime.time_since_epoch().count());
2943 }
2944 else
2945 {
2946 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2947 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2948 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2949
2950 if (auto const closeOffset = app_.timeKeeper().closeOffset();
2951 std::abs(closeOffset.count()) >= 60)
2952 l[jss::close_time_offset] =
2953 static_cast<std::uint32_t>(closeOffset.count());
2954
2955 constexpr std::chrono::seconds highAgeThreshold{1000000};
2957 {
2958 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2959 l[jss::age] =
2960 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2961 }
2962 else
2963 {
2964 auto lCloseTime = lpClosed->info().closeTime;
2965 auto closeTime = app_.timeKeeper().closeTime();
2966 if (lCloseTime <= closeTime)
2967 {
2968 using namespace std::chrono_literals;
2969 auto age = closeTime - lCloseTime;
2970 l[jss::age] =
2971 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2972 }
2973 }
2974 }
2975
2976 if (valid)
2977 info[jss::validated_ledger] = l;
2978 else
2979 info[jss::closed_ledger] = l;
2980
2981 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2982 if (!lpPublished)
2983 info[jss::published_ledger] = "none";
2984 else if (lpPublished->info().seq != lpClosed->info().seq)
2985 info[jss::published_ledger] = lpPublished->info().seq;
2986 }
2987
2988 accounting_.json(info);
2989 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2990 info[jss::jq_trans_overflow] =
2992 info[jss::peer_disconnects] =
2994 info[jss::peer_disconnects_resources] =
2996
2997 // This array must be sorted in increasing order.
2998 static constexpr std::array<std::string_view, 7> protocols{
2999 "http", "https", "peer", "ws", "ws2", "wss", "wss2"};
3000 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
3001 {
3003 for (auto const& port : app_.getServerHandler().setup().ports)
3004 {
3005 // Don't publish admin ports for non-admin users
3006 if (!admin &&
3007 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
3008 port.admin_user.empty() && port.admin_password.empty()))
3009 continue;
3012 std::begin(port.protocol),
3013 std::end(port.protocol),
3014 std::begin(protocols),
3015 std::end(protocols),
3016 std::back_inserter(proto));
3017 if (!proto.empty())
3018 {
3019 auto& jv = ports.append(Json::Value(Json::objectValue));
3020 jv[jss::port] = std::to_string(port.port);
3021 jv[jss::protocol] = Json::Value{Json::arrayValue};
3022 for (auto const& p : proto)
3023 jv[jss::protocol].append(p);
3024 }
3025 }
3026
3027 if (app_.config().exists(SECTION_PORT_GRPC))
3028 {
3029 auto const& grpcSection = app_.config().section(SECTION_PORT_GRPC);
3030 auto const optPort = grpcSection.get("port");
3031 if (optPort && grpcSection.get("ip"))
3032 {
3033 auto& jv = ports.append(Json::Value(Json::objectValue));
3034 jv[jss::port] = *optPort;
3035 jv[jss::protocol] = Json::Value{Json::arrayValue};
3036 jv[jss::protocol].append("grpc");
3037 }
3038 }
3039 info[jss::ports] = std::move(ports);
3040 }
3041
3042 return info;
3043}
3044
3045void
3050
3056
3057void
3059 std::shared_ptr<ReadView const> const& ledger,
3060 std::shared_ptr<STTx const> const& transaction,
3061 TER result)
3062{
3063 // never publish an inner txn inside a batch txn
3064 if (transaction->isFlag(tfInnerBatchTxn) &&
3065 ledger->rules().enabled(featureBatch))
3066 return;
3067
3068 MultiApiJson jvObj =
3069 transJson(transaction, result, false, ledger, std::nullopt);
3070
3071 {
3073
3074 auto it = mStreamMaps[sRTTransactions].begin();
3075 while (it != mStreamMaps[sRTTransactions].end())
3076 {
3077 InfoSub::pointer p = it->second.lock();
3078
3079 if (p)
3080 {
3081 jvObj.visit(
3082 p->getApiVersion(), //
3083 [&](Json::Value const& jv) { p->send(jv, true); });
3084 ++it;
3085 }
3086 else
3087 {
3088 it = mStreamMaps[sRTTransactions].erase(it);
3089 }
3090 }
3091 }
3092
3093 pubProposedAccountTransaction(ledger, transaction, result);
3094}
3095
3096void
3098{
3099 // Ledgers are published only when they acquire sufficient validations
3100 // Holes are filled across connection loss or other catastrophe
3101
3103 app_.getAcceptedLedgerCache().fetch(lpAccepted->info().hash);
3104 if (!alpAccepted)
3105 {
3106 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted, app_);
3107 app_.getAcceptedLedgerCache().canonicalize_replace_client(
3108 lpAccepted->info().hash, alpAccepted);
3109 }
3110
3111 XRPL_ASSERT(
3112 alpAccepted->getLedger().get() == lpAccepted.get(),
3113 "ripple::NetworkOPsImp::pubLedger : accepted input");
3114
3115 {
3116 JLOG(m_journal.debug())
3117 << "Publishing ledger " << lpAccepted->info().seq << " "
3118 << lpAccepted->info().hash;
3119
3121
3122 if (!mStreamMaps[sLedger].empty())
3123 {
3125
3126 jvObj[jss::type] = "ledgerClosed";
3127 jvObj[jss::ledger_index] = lpAccepted->info().seq;
3128 jvObj[jss::ledger_hash] = to_string(lpAccepted->info().hash);
3129 jvObj[jss::ledger_time] = Json::Value::UInt(
3130 lpAccepted->info().closeTime.time_since_epoch().count());
3131
3132 jvObj[jss::network_id] = app_.config().NETWORK_ID;
3133
3134 if (!lpAccepted->rules().enabled(featureXRPFees))
3135 jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
3136 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3137 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3138 jvObj[jss::reserve_inc] =
3139 lpAccepted->fees().increment.jsonClipped();
3140
3141 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
3142
3144 {
3145 jvObj[jss::validated_ledgers] =
3147 }
3148
3149 auto it = mStreamMaps[sLedger].begin();
3150 while (it != mStreamMaps[sLedger].end())
3151 {
3152 InfoSub::pointer p = it->second.lock();
3153 if (p)
3154 {
3155 p->send(jvObj, true);
3156 ++it;
3157 }
3158 else
3159 it = mStreamMaps[sLedger].erase(it);
3160 }
3161 }
3162
3163 if (!mStreamMaps[sBookChanges].empty())
3164 {
3165 Json::Value jvObj = ripple::RPC::computeBookChanges(lpAccepted);
3166
3167 auto it = mStreamMaps[sBookChanges].begin();
3168 while (it != mStreamMaps[sBookChanges].end())
3169 {
3170 InfoSub::pointer p = it->second.lock();
3171 if (p)
3172 {
3173 p->send(jvObj, true);
3174 ++it;
3175 }
3176 else
3177 it = mStreamMaps[sBookChanges].erase(it);
3178 }
3179 }
3180
3181 {
3182 static bool firstTime = true;
3183 if (firstTime)
3184 {
3185 // First validated ledger, start delayed SubAccountHistory
3186 firstTime = false;
3187 for (auto& outer : mSubAccountHistory)
3188 {
3189 for (auto& inner : outer.second)
3190 {
3191 auto& subInfo = inner.second;
3192 if (subInfo.index_->separationLedgerSeq_ == 0)
3193 {
3195 alpAccepted->getLedger(), subInfo);
3196 }
3197 }
3198 }
3199 }
3200 }
3201 }
3202
3203 // Don't lock since pubAcceptedTransaction is locking.
3204 for (auto const& accTx : *alpAccepted)
3205 {
3206 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
3208 lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3209 }
3210}
3211
3212void
3214{
3216 app_.openLedger().current()->fees().base,
3218 app_.getFeeTrack()};
3219
3220 // only schedule the job if something has changed
3221 if (f != mLastFeeSummary)
3222 {
3224 jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
3225 pubServer();
3226 });
3227 }
3228}
3229
3230void
3232{
3235 "reportConsensusStateChange->pubConsensus",
3236 [this, phase]() { pubConsensus(phase); });
3237}
3238
3239inline void
3241{
3242 m_localTX->sweep(view);
3243}
3244inline std::size_t
3246{
3247 return m_localTX->size();
3248}
3249
3250// This routine should only be used to publish accepted or validated
3251// transactions.
3254 std::shared_ptr<STTx const> const& transaction,
3255 TER result,
3256 bool validated,
3257 std::shared_ptr<ReadView const> const& ledger,
3259{
3261 std::string sToken;
3262 std::string sHuman;
3263
3264 transResultInfo(result, sToken, sHuman);
3265
3266 jvObj[jss::type] = "transaction";
3267 // NOTE jvObj is not a finished object for either API version. After
3268 // it's populated, we need to finish it for a specific API version. This is
3269 // done in a loop, near the end of this function.
3270 jvObj[jss::transaction] =
3271 transaction->getJson(JsonOptions::disable_API_prior_V2, false);
3272
3273 if (meta)
3274 {
3275 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
3277 jvObj[jss::meta], *ledger, transaction, meta->get());
3278 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
3280 jvObj[jss::meta], transaction, meta->get());
3281 }
3282
3283 // add CTID where the needed data for it exists
3284 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
3285 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3286 {
3287 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3288 uint32_t netID = app_.config().NETWORK_ID;
3289 if (transaction->isFieldPresent(sfNetworkID))
3290 netID = transaction->getFieldU32(sfNetworkID);
3291
3293 RPC::encodeCTID(ledger->info().seq, txnSeq, netID);
3294 ctid)
3295 jvObj[jss::ctid] = *ctid;
3296 }
3297 if (!ledger->open())
3298 jvObj[jss::ledger_hash] = to_string(ledger->info().hash);
3299
3300 if (validated)
3301 {
3302 jvObj[jss::ledger_index] = ledger->info().seq;
3303 jvObj[jss::transaction][jss::date] =
3304 ledger->info().closeTime.time_since_epoch().count();
3305 jvObj[jss::validated] = true;
3306 jvObj[jss::close_time_iso] = to_string_iso(ledger->info().closeTime);
3307
3308 // WRITEME: Put the account next seq here
3309 }
3310 else
3311 {
3312 jvObj[jss::validated] = false;
3313 jvObj[jss::ledger_current_index] = ledger->info().seq;
3314 }
3315
3316 jvObj[jss::status] = validated ? "closed" : "proposed";
3317 jvObj[jss::engine_result] = sToken;
3318 jvObj[jss::engine_result_code] = result;
3319 jvObj[jss::engine_result_message] = sHuman;
3320
3321 if (transaction->getTxnType() == ttOFFER_CREATE)
3322 {
3323 auto const account = transaction->getAccountID(sfAccount);
3324 auto const amount = transaction->getFieldAmount(sfTakerGets);
3325
3326 // If the offer create is not self funded then add the owner balance
3327 if (account != amount.issue().account)
3328 {
3329 auto const ownerFunds = accountFunds(
3330 *ledger,
3331 account,
3332 amount,
3334 app_.journal("View"));
3335 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3336 }
3337 }
3338
3339 std::string const hash = to_string(transaction->getTransactionID());
3340 MultiApiJson multiObj{jvObj};
3342 multiObj.visit(), //
3343 [&]<unsigned Version>(
3345 RPC::insertDeliverMax(
3346 jvTx[jss::transaction], transaction->getTxnType(), Version);
3347
3348 if constexpr (Version > 1)
3349 {
3350 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3351 jvTx[jss::hash] = hash;
3352 }
3353 else
3354 {
3355 jvTx[jss::transaction][jss::hash] = hash;
3356 }
3357 });
3358
3359 return multiObj;
3360}
3361
3362void
3364 std::shared_ptr<ReadView const> const& ledger,
3365 AcceptedLedgerTx const& transaction,
3366 bool last)
3367{
3368 auto const& stTxn = transaction.getTxn();
3369
3370 // Create two different Json objects, for different API versions
3371 auto const metaRef = std::ref(transaction.getMeta());
3372 auto const trResult = transaction.getResult();
3373 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3374
3375 {
3377
3378 auto it = mStreamMaps[sTransactions].begin();
3379 while (it != mStreamMaps[sTransactions].end())
3380 {
3381 InfoSub::pointer p = it->second.lock();
3382
3383 if (p)
3384 {
3385 jvObj.visit(
3386 p->getApiVersion(), //
3387 [&](Json::Value const& jv) { p->send(jv, true); });
3388 ++it;
3389 }
3390 else
3391 it = mStreamMaps[sTransactions].erase(it);
3392 }
3393
3394 it = mStreamMaps[sRTTransactions].begin();
3395
3396 while (it != mStreamMaps[sRTTransactions].end())
3397 {
3398 InfoSub::pointer p = it->second.lock();
3399
3400 if (p)
3401 {
3402 jvObj.visit(
3403 p->getApiVersion(), //
3404 [&](Json::Value const& jv) { p->send(jv, true); });
3405 ++it;
3406 }
3407 else
3408 it = mStreamMaps[sRTTransactions].erase(it);
3409 }
3410 }
3411
3412 if (transaction.getResult() == tesSUCCESS)
3413 app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
3414
3415 pubAccountTransaction(ledger, transaction, last);
3416}
3417
3418void
3420 std::shared_ptr<ReadView const> const& ledger,
3421 AcceptedLedgerTx const& transaction,
3422 bool last)
3423{
3425 int iProposed = 0;
3426 int iAccepted = 0;
3427
3428 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3429 auto const currLedgerSeq = ledger->seq();
3430 {
3432
3433 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3435 {
3436 for (auto const& affectedAccount : transaction.getAffected())
3437 {
3438 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3439 simiIt != mSubRTAccount.end())
3440 {
3441 auto it = simiIt->second.begin();
3442
3443 while (it != simiIt->second.end())
3444 {
3445 InfoSub::pointer p = it->second.lock();
3446
3447 if (p)
3448 {
3449 notify.insert(p);
3450 ++it;
3451 ++iProposed;
3452 }
3453 else
3454 it = simiIt->second.erase(it);
3455 }
3456 }
3457
3458 if (auto simiIt = mSubAccount.find(affectedAccount);
3459 simiIt != mSubAccount.end())
3460 {
3461 auto it = simiIt->second.begin();
3462 while (it != simiIt->second.end())
3463 {
3464 InfoSub::pointer p = it->second.lock();
3465
3466 if (p)
3467 {
3468 notify.insert(p);
3469 ++it;
3470 ++iAccepted;
3471 }
3472 else
3473 it = simiIt->second.erase(it);
3474 }
3475 }
3476
3477 if (auto histoIt = mSubAccountHistory.find(affectedAccount);
3478 histoIt != mSubAccountHistory.end())
3479 {
3480 auto& subs = histoIt->second;
3481 auto it = subs.begin();
3482 while (it != subs.end())
3483 {
3484 SubAccountHistoryInfoWeak const& info = it->second;
3485 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3486 {
3487 ++it;
3488 continue;
3489 }
3490
3491 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3492 {
3493 accountHistoryNotify.emplace_back(
3494 SubAccountHistoryInfo{isSptr, info.index_});
3495 ++it;
3496 }
3497 else
3498 {
3499 it = subs.erase(it);
3500 }
3501 }
3502 if (subs.empty())
3503 mSubAccountHistory.erase(histoIt);
3504 }
3505 }
3506 }
3507 }
3508
3509 JLOG(m_journal.trace())
3510 << "pubAccountTransaction: "
3511 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3512
3513 if (!notify.empty() || !accountHistoryNotify.empty())
3514 {
3515 auto const& stTxn = transaction.getTxn();
3516
3517 // Create two different Json objects, for different API versions
3518 auto const metaRef = std::ref(transaction.getMeta());
3519 auto const trResult = transaction.getResult();
3520 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3521
3522 for (InfoSub::ref isrListener : notify)
3523 {
3524 jvObj.visit(
3525 isrListener->getApiVersion(), //
3526 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3527 }
3528
3529 if (last)
3530 jvObj.set(jss::account_history_boundary, true);
3531
3532 XRPL_ASSERT(
3533 jvObj.isMember(jss::account_history_tx_stream) ==
3535 "ripple::NetworkOPsImp::pubAccountTransaction : "
3536 "account_history_tx_stream not set");
3537 for (auto& info : accountHistoryNotify)
3538 {
3539 auto& index = info.index_;
3540 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3541 jvObj.set(jss::account_history_tx_first, true);
3542
3543 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3544
3545 jvObj.visit(
3546 info.sink_->getApiVersion(), //
3547 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3548 }
3549 }
3550}
3551
3552void
3554 std::shared_ptr<ReadView const> const& ledger,
3556 TER result)
3557{
3559 int iProposed = 0;
3560
3561 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3562
3563 {
3565
3566 if (mSubRTAccount.empty())
3567 return;
3568
3569 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3571 {
3572 for (auto const& affectedAccount : tx->getMentionedAccounts())
3573 {
3574 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3575 simiIt != mSubRTAccount.end())
3576 {
3577 auto it = simiIt->second.begin();
3578
3579 while (it != simiIt->second.end())
3580 {
3581 InfoSub::pointer p = it->second.lock();
3582
3583 if (p)
3584 {
3585 notify.insert(p);
3586 ++it;
3587 ++iProposed;
3588 }
3589 else
3590 it = simiIt->second.erase(it);
3591 }
3592 }
3593 }
3594 }
3595 }
3596
3597 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3598
3599 if (!notify.empty() || !accountHistoryNotify.empty())
3600 {
3601 // Create two different Json objects, for different API versions
3602 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3603
3604 for (InfoSub::ref isrListener : notify)
3605 jvObj.visit(
3606 isrListener->getApiVersion(), //
3607 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3608
3609 XRPL_ASSERT(
3610 jvObj.isMember(jss::account_history_tx_stream) ==
3612 "ripple::NetworkOPs::pubProposedAccountTransaction : "
3613 "account_history_tx_stream not set");
3614 for (auto& info : accountHistoryNotify)
3615 {
3616 auto& index = info.index_;
3617 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3618 jvObj.set(jss::account_history_tx_first, true);
3619 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3620 jvObj.visit(
3621 info.sink_->getApiVersion(), //
3622 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3623 }
3624 }
3625}
3626
3627//
3628// Monitoring
3629//
3630
3631void
3633 InfoSub::ref isrListener,
3634 hash_set<AccountID> const& vnaAccountIDs,
3635 bool rt)
3636{
3637 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3638
3639 for (auto const& naAccountID : vnaAccountIDs)
3640 {
3641 JLOG(m_journal.trace())
3642 << "subAccount: account: " << toBase58(naAccountID);
3643
3644 isrListener->insertSubAccountInfo(naAccountID, rt);
3645 }
3646
3648
3649 for (auto const& naAccountID : vnaAccountIDs)
3650 {
3651 auto simIterator = subMap.find(naAccountID);
3652 if (simIterator == subMap.end())
3653 {
3654 // Not found, note that account has a new single listner.
3655 SubMapType usisElement;
3656 usisElement[isrListener->getSeq()] = isrListener;
3657 // VFALCO NOTE This is making a needless copy of naAccountID
3658 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3659 }
3660 else
3661 {
3662 // Found, note that the account has another listener.
3663 simIterator->second[isrListener->getSeq()] = isrListener;
3664 }
3665 }
3666}
3667
3668void
3670 InfoSub::ref isrListener,
3671 hash_set<AccountID> const& vnaAccountIDs,
3672 bool rt)
3673{
3674 for (auto const& naAccountID : vnaAccountIDs)
3675 {
3676 // Remove from the InfoSub
3677 isrListener->deleteSubAccountInfo(naAccountID, rt);
3678 }
3679
3680 // Remove from the server
3681 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3682}
3683
3684void
3686 std::uint64_t uSeq,
3687 hash_set<AccountID> const& vnaAccountIDs,
3688 bool rt)
3689{
3691
3692 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3693
3694 for (auto const& naAccountID : vnaAccountIDs)
3695 {
3696 auto simIterator = subMap.find(naAccountID);
3697
3698 if (simIterator != subMap.end())
3699 {
3700 // Found
3701 simIterator->second.erase(uSeq);
3702
3703 if (simIterator->second.empty())
3704 {
3705 // Don't need hash entry.
3706 subMap.erase(simIterator);
3707 }
3708 }
3709 }
3710}
3711
3712void
3714{
3715 enum DatabaseType { Sqlite, None };
3716 static auto const databaseType = [&]() -> DatabaseType {
3717 // Use a dynamic_cast to return DatabaseType::None
3718 // on failure.
3719 if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
3720 {
3721 return DatabaseType::Sqlite;
3722 }
3723 return DatabaseType::None;
3724 }();
3725
3726 if (databaseType == DatabaseType::None)
3727 {
3728 // LCOV_EXCL_START
3729 UNREACHABLE(
3730 "ripple::NetworkOPsImp::addAccountHistoryJob : no database");
3731 JLOG(m_journal.error())
3732 << "AccountHistory job for account "
3733 << toBase58(subInfo.index_->accountId_) << " no database";
3734 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3735 {
3736 sptr->send(rpcError(rpcINTERNAL), true);
3737 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3738 }
3739 return;
3740 // LCOV_EXCL_STOP
3741 }
3742
3745 "AccountHistoryTxStream",
3746 [this, dbType = databaseType, subInfo]() {
3747 auto const& accountId = subInfo.index_->accountId_;
3748 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3749 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3750
3751 JLOG(m_journal.trace())
3752 << "AccountHistory job for account " << toBase58(accountId)
3753 << " started. lastLedgerSeq=" << lastLedgerSeq;
3754
3755 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
3756 std::shared_ptr<TxMeta> const& meta) -> bool {
3757 /*
3758 * genesis account: first tx is the one with seq 1
3759 * other account: first tx is the one created the account
3760 */
3761 if (accountId == genesisAccountId)
3762 {
3763 auto stx = tx->getSTransaction();
3764 if (stx->getAccountID(sfAccount) == accountId &&
3765 stx->getSeqValue() == 1)
3766 return true;
3767 }
3768
3769 for (auto& node : meta->getNodes())
3770 {
3771 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3772 continue;
3773
3774 if (node.isFieldPresent(sfNewFields))
3775 {
3776 if (auto inner = dynamic_cast<STObject const*>(
3777 node.peekAtPField(sfNewFields));
3778 inner)
3779 {
3780 if (inner->isFieldPresent(sfAccount) &&
3781 inner->getAccountID(sfAccount) == accountId)
3782 {
3783 return true;
3784 }
3785 }
3786 }
3787 }
3788
3789 return false;
3790 };
3791
3792 auto send = [&](Json::Value const& jvObj,
3793 bool unsubscribe) -> bool {
3794 if (auto sptr = subInfo.sinkWptr_.lock())
3795 {
3796 sptr->send(jvObj, true);
3797 if (unsubscribe)
3798 unsubAccountHistory(sptr, accountId, false);
3799 return true;
3800 }
3801
3802 return false;
3803 };
3804
3805 auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
3806 bool unsubscribe) -> bool {
3807 if (auto sptr = subInfo.sinkWptr_.lock())
3808 {
3809 jvObj.visit(
3810 sptr->getApiVersion(), //
3811 [&](Json::Value const& jv) { sptr->send(jv, true); });
3812
3813 if (unsubscribe)
3814 unsubAccountHistory(sptr, accountId, false);
3815 return true;
3816 }
3817
3818 return false;
3819 };
3820
3821 auto getMoreTxns =
3822 [&](std::uint32_t minLedger,
3823 std::uint32_t maxLedger,
3828 switch (dbType)
3829 {
3830 case Sqlite: {
3831 auto db = static_cast<SQLiteDatabase*>(
3834 accountId, minLedger, maxLedger, marker, 0, true};
3835 return db->newestAccountTxPage(options);
3836 }
3837 // LCOV_EXCL_START
3838 default: {
3839 UNREACHABLE(
3840 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3841 "getMoreTxns : invalid database type");
3842 return {};
3843 }
3844 // LCOV_EXCL_STOP
3845 }
3846 };
3847
3848 /*
3849 * search backward until the genesis ledger or asked to stop
3850 */
3851 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3852 {
3853 int feeChargeCount = 0;
3854 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3855 {
3856 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3857 ++feeChargeCount;
3858 }
3859 else
3860 {
3861 JLOG(m_journal.trace())
3862 << "AccountHistory job for account "
3863 << toBase58(accountId) << " no InfoSub. Fee charged "
3864 << feeChargeCount << " times.";
3865 return;
3866 }
3867
3868 // try to search in 1024 ledgers till reaching genesis ledgers
3869 auto startLedgerSeq =
3870 (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3871 JLOG(m_journal.trace())
3872 << "AccountHistory job for account " << toBase58(accountId)
3873 << ", working on ledger range [" << startLedgerSeq << ","
3874 << lastLedgerSeq << "]";
3875
3876 auto haveRange = [&]() -> bool {
3877 std::uint32_t validatedMin = UINT_MAX;
3878 std::uint32_t validatedMax = 0;
3879 auto haveSomeValidatedLedgers =
3881 validatedMin, validatedMax);
3882
3883 return haveSomeValidatedLedgers &&
3884 validatedMin <= startLedgerSeq &&
3885 lastLedgerSeq <= validatedMax;
3886 }();
3887
3888 if (!haveRange)
3889 {
3890 JLOG(m_journal.debug())
3891 << "AccountHistory reschedule job for account "
3892 << toBase58(accountId) << ", incomplete ledger range ["
3893 << startLedgerSeq << "," << lastLedgerSeq << "]";
3895 return;
3896 }
3897
3899 while (!subInfo.index_->stopHistorical_)
3900 {
3901 auto dbResult =
3902 getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3903 if (!dbResult)
3904 {
3905 // LCOV_EXCL_START
3906 UNREACHABLE(
3907 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3908 "getMoreTxns failed");
3909 JLOG(m_journal.debug())
3910 << "AccountHistory job for account "
3911 << toBase58(accountId) << " getMoreTxns failed.";
3912 send(rpcError(rpcINTERNAL), true);
3913 return;
3914 // LCOV_EXCL_STOP
3915 }
3916
3917 auto const& txns = dbResult->first;
3918 marker = dbResult->second;
3919 size_t num_txns = txns.size();
3920 for (size_t i = 0; i < num_txns; ++i)
3921 {
3922 auto const& [tx, meta] = txns[i];
3923
3924 if (!tx || !meta)
3925 {
3926 JLOG(m_journal.debug())
3927 << "AccountHistory job for account "
3928 << toBase58(accountId) << " empty tx or meta.";
3929 send(rpcError(rpcINTERNAL), true);
3930 return;
3931 }
3932 auto curTxLedger =
3934 tx->getLedger());
3935 if (!curTxLedger)
3936 {
3937 // LCOV_EXCL_START
3938 UNREACHABLE(
3939 "ripple::NetworkOPsImp::addAccountHistoryJob : "
3940 "getLedgerBySeq failed");
3941 JLOG(m_journal.debug())
3942 << "AccountHistory job for account "
3943 << toBase58(accountId) << " no ledger.";
3944 send(rpcError(rpcINTERNAL), true);
3945 return;
3946 // LCOV_EXCL_STOP
3947 }
3949 tx->getSTransaction();
3950 if (!stTxn)
3951 {
3952 // LCOV_EXCL_START
3953 UNREACHABLE(
3954 "NetworkOPsImp::addAccountHistoryJob : "
3955 "getSTransaction failed");
3956 JLOG(m_journal.debug())
3957 << "AccountHistory job for account "
3958 << toBase58(accountId)
3959 << " getSTransaction failed.";
3960 send(rpcError(rpcINTERNAL), true);
3961 return;
3962 // LCOV_EXCL_STOP
3963 }
3964
3965 auto const mRef = std::ref(*meta);
3966 auto const trR = meta->getResultTER();
3967 MultiApiJson jvTx =
3968 transJson(stTxn, trR, true, curTxLedger, mRef);
3969
3970 jvTx.set(
3971 jss::account_history_tx_index, txHistoryIndex--);
3972 if (i + 1 == num_txns ||
3973 txns[i + 1].first->getLedger() != tx->getLedger())
3974 jvTx.set(jss::account_history_boundary, true);
3975
3976 if (isFirstTx(tx, meta))
3977 {
3978 jvTx.set(jss::account_history_tx_first, true);
3979 sendMultiApiJson(jvTx, false);
3980
3981 JLOG(m_journal.trace())
3982 << "AccountHistory job for account "
3983 << toBase58(accountId)
3984 << " done, found last tx.";
3985 return;
3986 }
3987 else
3988 {
3989 sendMultiApiJson(jvTx, false);
3990 }
3991 }
3992
3993 if (marker)
3994 {
3995 JLOG(m_journal.trace())
3996 << "AccountHistory job for account "
3997 << toBase58(accountId)
3998 << " paging, marker=" << marker->ledgerSeq << ":"
3999 << marker->txnSeq;
4000 }
4001 else
4002 {
4003 break;
4004 }
4005 }
4006
4007 if (!subInfo.index_->stopHistorical_)
4008 {
4009 lastLedgerSeq = startLedgerSeq - 1;
4010 if (lastLedgerSeq <= 1)
4011 {
4012 JLOG(m_journal.trace())
4013 << "AccountHistory job for account "
4014 << toBase58(accountId)
4015 << " done, reached genesis ledger.";
4016 return;
4017 }
4018 }
4019 }
4020 });
4021}
4022
4023void
4025 std::shared_ptr<ReadView const> const& ledger,
4027{
4028 subInfo.index_->separationLedgerSeq_ = ledger->seq();
4029 auto const& accountId = subInfo.index_->accountId_;
4030 auto const accountKeylet = keylet::account(accountId);
4031 if (!ledger->exists(accountKeylet))
4032 {
4033 JLOG(m_journal.debug())
4034 << "subAccountHistoryStart, no account " << toBase58(accountId)
4035 << ", no need to add AccountHistory job.";
4036 return;
4037 }
4038 if (accountId == genesisAccountId)
4039 {
4040 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4041 {
4042 if (sleAcct->getFieldU32(sfSequence) == 1)
4043 {
4044 JLOG(m_journal.debug())
4045 << "subAccountHistoryStart, genesis account "
4046 << toBase58(accountId)
4047 << " does not have tx, no need to add AccountHistory job.";
4048 return;
4049 }
4050 }
4051 else
4052 {
4053 // LCOV_EXCL_START
4054 UNREACHABLE(
4055 "ripple::NetworkOPsImp::subAccountHistoryStart : failed to "
4056 "access genesis account");
4057 return;
4058 // LCOV_EXCL_STOP
4059 }
4060 }
4061 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
4062 subInfo.index_->haveHistorical_ = true;
4063
4064 JLOG(m_journal.debug())
4065 << "subAccountHistoryStart, add AccountHistory job: accountId="
4066 << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
4067
4068 addAccountHistoryJob(subInfo);
4069}
4070
4073 InfoSub::ref isrListener,
4074 AccountID const& accountId)
4075{
4076 if (!isrListener->insertSubAccountHistory(accountId))
4077 {
4078 JLOG(m_journal.debug())
4079 << "subAccountHistory, already subscribed to account "
4080 << toBase58(accountId);
4081 return rpcINVALID_PARAMS;
4082 }
4083
4086 isrListener, std::make_shared<SubAccountHistoryIndex>(accountId)};
4087 auto simIterator = mSubAccountHistory.find(accountId);
4088 if (simIterator == mSubAccountHistory.end())
4089 {
4091 inner.emplace(isrListener->getSeq(), ahi);
4093 simIterator, std::make_pair(accountId, inner));
4094 }
4095 else
4096 {
4097 simIterator->second.emplace(isrListener->getSeq(), ahi);
4098 }
4099
4100 auto const ledger = app_.getLedgerMaster().getValidatedLedger();
4101 if (ledger)
4102 {
4103 subAccountHistoryStart(ledger, ahi);
4104 }
4105 else
4106 {
4107 // The node does not have validated ledgers, so wait for
4108 // one before start streaming.
4109 // In this case, the subscription is also considered successful.
4110 JLOG(m_journal.debug())
4111 << "subAccountHistory, no validated ledger yet, delay start";
4112 }
4113
4114 return rpcSUCCESS;
4115}
4116
4117void
4119 InfoSub::ref isrListener,
4120 AccountID const& account,
4121 bool historyOnly)
4122{
4123 if (!historyOnly)
4124 isrListener->deleteSubAccountHistory(account);
4125 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
4126}
4127
4128void
4130 std::uint64_t seq,
4131 AccountID const& account,
4132 bool historyOnly)
4133{
4135 auto simIterator = mSubAccountHistory.find(account);
4136 if (simIterator != mSubAccountHistory.end())
4137 {
4138 auto& subInfoMap = simIterator->second;
4139 auto subInfoIter = subInfoMap.find(seq);
4140 if (subInfoIter != subInfoMap.end())
4141 {
4142 subInfoIter->second.index_->stopHistorical_ = true;
4143 }
4144
4145 if (!historyOnly)
4146 {
4147 simIterator->second.erase(seq);
4148 if (simIterator->second.empty())
4149 {
4150 mSubAccountHistory.erase(simIterator);
4151 }
4152 }
4153 JLOG(m_journal.debug())
4154 << "unsubAccountHistory, account " << toBase58(account)
4155 << ", historyOnly = " << (historyOnly ? "true" : "false");
4156 }
4157}
4158
4159bool
4161{
4162 if (auto listeners = app_.getOrderBookDB().makeBookListeners(book))
4163 listeners->addSubscriber(isrListener);
4164 else
4165 {
4166 // LCOV_EXCL_START
4167 UNREACHABLE("ripple::NetworkOPsImp::subBook : null book listeners");
4168 // LCOV_EXCL_STOP
4169 }
4170 return true;
4171}
4172
4173bool
4175{
4176 if (auto listeners = app_.getOrderBookDB().getBookListeners(book))
4177 listeners->removeSubscriber(uSeq);
4178
4179 return true;
4180}
4181
4185{
4186 // This code-path is exclusively used when the server is in standalone
4187 // mode via `ledger_accept`
4188 XRPL_ASSERT(
4189 m_standalone, "ripple::NetworkOPsImp::acceptLedger : is standalone");
4190
4191 if (!m_standalone)
4192 Throw<std::runtime_error>(
4193 "Operation only possible in STANDALONE mode.");
4194
4195 // FIXME Could we improve on this and remove the need for a specialized
4196 // API in Consensus?
4197 beginConsensus(m_ledgerMaster.getClosedLedger()->info().hash, {});
4198 mConsensus.simulate(app_.timeKeeper().closeTime(), consensusDelay);
4199 return m_ledgerMaster.getCurrentLedger()->info().seq;
4200}
4201
4202// <-- bool: true=added, false=already there
4203bool
4205{
4206 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
4207 {
4208 jvResult[jss::ledger_index] = lpClosed->info().seq;
4209 jvResult[jss::ledger_hash] = to_string(lpClosed->info().hash);
4210 jvResult[jss::ledger_time] = Json::Value::UInt(
4211 lpClosed->info().closeTime.time_since_epoch().count());
4212 if (!lpClosed->rules().enabled(featureXRPFees))
4213 jvResult[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
4214 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4215 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4216 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4217 jvResult[jss::network_id] = app_.config().NETWORK_ID;
4218 }
4219
4221 {
4222 jvResult[jss::validated_ledgers] =
4224 }
4225
4227 return mStreamMaps[sLedger]
4228 .emplace(isrListener->getSeq(), isrListener)
4229 .second;
4230}
4231
4232// <-- bool: true=added, false=already there
4233bool
4235{
4238 .emplace(isrListener->getSeq(), isrListener)
4239 .second;
4240}
4241
4242// <-- bool: true=erased, false=was not there
4243bool
4245{
4247 return mStreamMaps[sLedger].erase(uSeq);
4248}
4249
4250// <-- bool: true=erased, false=was not there
4251bool
4257
4258// <-- bool: true=added, false=already there
4259bool
4261{
4263 return mStreamMaps[sManifests]
4264 .emplace(isrListener->getSeq(), isrListener)
4265 .second;
4266}
4267
4268// <-- bool: true=erased, false=was not there
4269bool
4275
4276// <-- bool: true=added, false=already there
4277bool
4279 InfoSub::ref isrListener,
4280 Json::Value& jvResult,
4281 bool admin)
4282{
4283 uint256 uRandom;
4284
4285 if (m_standalone)
4286 jvResult[jss::stand_alone] = m_standalone;
4287
4288 // CHECKME: is it necessary to provide a random number here?
4289 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
4290
4291 auto const& feeTrack = app_.getFeeTrack();
4292 jvResult[jss::random] = to_string(uRandom);
4293 jvResult[jss::server_status] = strOperatingMode(admin);
4294 jvResult[jss::load_base] = feeTrack.getLoadBase();
4295 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4296 jvResult[jss::hostid] = getHostId(admin);
4297 jvResult[jss::pubkey_node] =
4299
4301 return mStreamMaps[sServer]
4302 .emplace(isrListener->getSeq(), isrListener)
4303 .second;
4304}
4305
4306// <-- bool: true=erased, false=was not there
4307bool
4309{
4311 return mStreamMaps[sServer].erase(uSeq);
4312}
4313
4314// <-- bool: true=added, false=already there
4315bool
4317{
4320 .emplace(isrListener->getSeq(), isrListener)
4321 .second;
4322}
4323
4324// <-- bool: true=erased, false=was not there
4325bool
4331
4332// <-- bool: true=added, false=already there
4333bool
4335{
4338 .emplace(isrListener->getSeq(), isrListener)
4339 .second;
4340}
4341
4342// <-- bool: true=erased, false=was not there
4343bool
4349
4350// <-- bool: true=added, false=already there
4351bool
4353{
4356 .emplace(isrListener->getSeq(), isrListener)
4357 .second;
4358}
4359
4360void
4365
4366// <-- bool: true=erased, false=was not there
4367bool
4373
4374// <-- bool: true=added, false=already there
4375bool
4377{
4379 return mStreamMaps[sPeerStatus]
4380 .emplace(isrListener->getSeq(), isrListener)
4381 .second;
4382}
4383
4384// <-- bool: true=erased, false=was not there
4385bool
4391
4392// <-- bool: true=added, false=already there
4393bool
4395{
4398 .emplace(isrListener->getSeq(), isrListener)
4399 .second;
4400}
4401
4402// <-- bool: true=erased, false=was not there
4403bool
4409
4412{
4414
4415 subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
4416
4417 if (it != mRpcSubMap.end())
4418 return it->second;
4419
4420 return InfoSub::pointer();
4421}
4422
4425{
4427
4428 mRpcSubMap.emplace(strUrl, rspEntry);
4429
4430 return rspEntry;
4431}
4432
4433bool
4435{
4437 auto pInfo = findRpcSub(strUrl);
4438
4439 if (!pInfo)
4440 return false;
4441
4442 // check to see if any of the stream maps still hold a weak reference to
4443 // this entry before removing
4444 for (SubMapType const& map : mStreamMaps)
4445 {
4446 if (map.find(pInfo->getSeq()) != map.end())
4447 return false;
4448 }
4449 mRpcSubMap.erase(strUrl);
4450 return true;
4451}
4452
4453#ifndef USE_NEW_BOOK_PAGE
4454
4455// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4456// work, but it demonstrated poor performance.
4457//
4458void
4461 Book const& book,
4462 AccountID const& uTakerID,
4463 bool const bProof,
4464 unsigned int iLimit,
4465 Json::Value const& jvMarker,
4466 Json::Value& jvResult)
4467{ // CAUTION: This is the old get book page logic
4468 Json::Value& jvOffers =
4469 (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4470
4472 uint256 const uBookBase = getBookBase(book);
4473 uint256 const uBookEnd = getQualityNext(uBookBase);
4474 uint256 uTipIndex = uBookBase;
4475
4476 if (auto stream = m_journal.trace())
4477 {
4478 stream << "getBookPage:" << book;
4479 stream << "getBookPage: uBookBase=" << uBookBase;
4480 stream << "getBookPage: uBookEnd=" << uBookEnd;
4481 stream << "getBookPage: uTipIndex=" << uTipIndex;
4482 }
4483
4484 ReadView const& view = *lpLedger;
4485
4486 bool const bGlobalFreeze = isGlobalFrozen(view, book.out.account) ||
4487 isGlobalFrozen(view, book.in.account);
4488
4489 bool bDone = false;
4490 bool bDirectAdvance = true;
4491
4492 std::shared_ptr<SLE const> sleOfferDir;
4493 uint256 offerIndex;
4494 unsigned int uBookEntry;
4495 STAmount saDirRate;
4496
4497 auto const rate = transferRate(view, book.out.account);
4498 auto viewJ = app_.journal("View");
4499
4500 while (!bDone && iLimit-- > 0)
4501 {
4502 if (bDirectAdvance)
4503 {
4504 bDirectAdvance = false;
4505
4506 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4507
4508 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4509 if (ledgerIndex)
4510 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4511 else
4512 sleOfferDir.reset();
4513
4514 if (!sleOfferDir)
4515 {
4516 JLOG(m_journal.trace()) << "getBookPage: bDone";
4517 bDone = true;
4518 }
4519 else
4520 {
4521 uTipIndex = sleOfferDir->key();
4522 saDirRate = amountFromQuality(getQuality(uTipIndex));
4523
4524 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4525
4526 JLOG(m_journal.trace())
4527 << "getBookPage: uTipIndex=" << uTipIndex;
4528 JLOG(m_journal.trace())
4529 << "getBookPage: offerIndex=" << offerIndex;
4530 }
4531 }
4532
4533 if (!bDone)
4534 {
4535 auto sleOffer = view.read(keylet::offer(offerIndex));
4536
4537 if (sleOffer)
4538 {
4539 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4540 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4541 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4542 STAmount saOwnerFunds;
4543 bool firstOwnerOffer(true);
4544
4545 if (book.out.account == uOfferOwnerID)
4546 {
4547 // If an offer is selling issuer's own IOUs, it is fully
4548 // funded.
4549 saOwnerFunds = saTakerGets;
4550 }
4551 else if (bGlobalFreeze)
4552 {
4553 // If either asset is globally frozen, consider all offers
4554 // that aren't ours to be totally unfunded
4555 saOwnerFunds.clear(book.out);
4556 }
4557 else
4558 {
4559 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4560 if (umBalanceEntry != umBalance.end())
4561 {
4562 // Found in running balance table.
4563
4564 saOwnerFunds = umBalanceEntry->second;
4565 firstOwnerOffer = false;
4566 }
4567 else
4568 {
4569 // Did not find balance in table.
4570
4571 saOwnerFunds = accountHolds(
4572 view,
4573 uOfferOwnerID,
4574 book.out.currency,
4575 book.out.account,
4577 viewJ);
4578
4579 if (saOwnerFunds < beast::zero)
4580 {
4581 // Treat negative funds as zero.
4582
4583 saOwnerFunds.clear();
4584 }
4585 }
4586 }
4587
4588 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4589
4590 STAmount saTakerGetsFunded;
4591 STAmount saOwnerFundsLimit = saOwnerFunds;
4592 Rate offerRate = parityRate;
4593
4594 if (rate != parityRate
4595 // Have a tranfer fee.
4596 && uTakerID != book.out.account
4597 // Not taking offers of own IOUs.
4598 && book.out.account != uOfferOwnerID)
4599 // Offer owner not issuing ownfunds
4600 {
4601 // Need to charge a transfer fee to offer owner.
4602 offerRate = rate;
4603 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4604 }
4605
4606 if (saOwnerFundsLimit >= saTakerGets)
4607 {
4608 // Sufficient funds no shenanigans.
4609 saTakerGetsFunded = saTakerGets;
4610 }
4611 else
4612 {
4613 // Only provide, if not fully funded.
4614
4615 saTakerGetsFunded = saOwnerFundsLimit;
4616
4617 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4618 std::min(
4619 saTakerPays,
4620 multiply(
4621 saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4622 .setJson(jvOffer[jss::taker_pays_funded]);
4623 }
4624
4625 STAmount saOwnerPays = (parityRate == offerRate)
4626 ? saTakerGetsFunded
4627 : std::min(
4628 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4629
4630 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4631
4632 // Include all offers funded and unfunded
4633 Json::Value& jvOf = jvOffers.append(jvOffer);
4634 jvOf[jss::quality] = saDirRate.getText();
4635
4636 if (firstOwnerOffer)
4637 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4638 }
4639 else
4640 {
4641 JLOG(m_journal.warn()) << "Missing offer";
4642 }
4643
4644 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4645 {
4646 bDirectAdvance = true;
4647 }
4648 else
4649 {
4650 JLOG(m_journal.trace())
4651 << "getBookPage: offerIndex=" << offerIndex;
4652 }
4653 }
4654 }
4655
4656 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4657 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4658}
4659
4660#else
4661
4662// This is the new code that uses the book iterators
4663// It has temporarily been disabled
4664
4665void
4668 Book const& book,
4669 AccountID const& uTakerID,
4670 bool const bProof,
4671 unsigned int iLimit,
4672 Json::Value const& jvMarker,
4673 Json::Value& jvResult)
4674{
4675 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4676
4678
4679 MetaView lesActive(lpLedger, tapNONE, true);
4680 OrderBookIterator obIterator(lesActive, book);
4681
4682 auto const rate = transferRate(lesActive, book.out.account);
4683
4684 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.out.account) ||
4685 lesActive.isGlobalFrozen(book.in.account);
4686
4687 while (iLimit-- > 0 && obIterator.nextOffer())
4688 {
4689 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4690 if (sleOffer)
4691 {
4692 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4693 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4694 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4695 STAmount saDirRate = obIterator.getCurrentRate();
4696 STAmount saOwnerFunds;
4697
4698 if (book.out.account == uOfferOwnerID)
4699 {
4700 // If offer is selling issuer's own IOUs, it is fully funded.
4701 saOwnerFunds = saTakerGets;
4702 }
4703 else if (bGlobalFreeze)
4704 {
4705 // If either asset is globally frozen, consider all offers
4706 // that aren't ours to be totally unfunded
4707 saOwnerFunds.clear(book.out);
4708 }
4709 else
4710 {
4711 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4712
4713 if (umBalanceEntry != umBalance.end())
4714 {
4715 // Found in running balance table.
4716
4717 saOwnerFunds = umBalanceEntry->second;
4718 }
4719 else
4720 {
4721 // Did not find balance in table.
4722
4723 saOwnerFunds = lesActive.accountHolds(
4724 uOfferOwnerID,
4725 book.out.currency,
4726 book.out.account,
4728
4729 if (saOwnerFunds.isNegative())
4730 {
4731 // Treat negative funds as zero.
4732
4733 saOwnerFunds.zero();
4734 }
4735 }
4736 }
4737
4738 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4739
4740 STAmount saTakerGetsFunded;
4741 STAmount saOwnerFundsLimit = saOwnerFunds;
4742 Rate offerRate = parityRate;
4743
4744 if (rate != parityRate
4745 // Have a tranfer fee.
4746 && uTakerID != book.out.account
4747 // Not taking offers of own IOUs.
4748 && book.out.account != uOfferOwnerID)
4749 // Offer owner not issuing ownfunds
4750 {
4751 // Need to charge a transfer fee to offer owner.
4752 offerRate = rate;
4753 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4754 }
4755
4756 if (saOwnerFundsLimit >= saTakerGets)
4757 {
4758 // Sufficient funds no shenanigans.
4759 saTakerGetsFunded = saTakerGets;
4760 }
4761 else
4762 {
4763 // Only provide, if not fully funded.
4764 saTakerGetsFunded = saOwnerFundsLimit;
4765
4766 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4767
4768 // TOOD(tom): The result of this expression is not used - what's
4769 // going on here?
4770 std::min(
4771 saTakerPays,
4772 multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4773 .setJson(jvOffer[jss::taker_pays_funded]);
4774 }
4775
4776 STAmount saOwnerPays = (parityRate == offerRate)
4777 ? saTakerGetsFunded
4778 : std::min(
4779 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4780
4781 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4782
4783 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4784 {
4785 // Only provide funded offers and offers of the taker.
4786 Json::Value& jvOf = jvOffers.append(jvOffer);
4787 jvOf[jss::quality] = saDirRate.getText();
4788 }
4789 }
4790 }
4791
4792 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4793 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4794}
4795
4796#endif
4797
4798inline void
4800{
4801 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4802 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4804 counters[static_cast<std::size_t>(mode)].dur += current;
4805
4808 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4809 .dur.count());
4811 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4812 .dur.count());
4814 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4816 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4817 .dur.count());
4819 counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4820
4822 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4823 .transitions);
4825 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4826 .transitions);
4828 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4830 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4831 .transitions);
4833 counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4834}
4835
4836void
4838{
4839 auto now = std::chrono::steady_clock::now();
4840
4841 std::lock_guard lock(mutex_);
4842 ++counters_[static_cast<std::size_t>(om)].transitions;
4843 if (om == OperatingMode::FULL &&
4844 counters_[static_cast<std::size_t>(om)].transitions == 1)
4845 {
4846 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
4847 now - processStart_)
4848 .count();
4849 }
4850 counters_[static_cast<std::size_t>(mode_)].dur +=
4851 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4852
4853 mode_ = om;
4854 start_ = now;
4855}
4856
4857void
4859{
4860 auto [counters, mode, start, initialSync] = getCounterData();
4861 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4863 counters[static_cast<std::size_t>(mode)].dur += current;
4864
4865 obj[jss::state_accounting] = Json::objectValue;
4867 i <= static_cast<std::size_t>(OperatingMode::FULL);
4868 ++i)
4869 {
4870 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4871 auto& state = obj[jss::state_accounting][states_[i]];
4872 state[jss::transitions] = std::to_string(counters[i].transitions);
4873 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4874 }
4875 obj[jss::server_state_duration_us] = std::to_string(current.count());
4876 if (initialSync)
4877 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4878}
4879
4880//------------------------------------------------------------------------------
4881
4884 Application& app,
4886 bool standalone,
4887 std::size_t minPeerCount,
4888 bool startvalid,
4889 JobQueue& job_queue,
4891 ValidatorKeys const& validatorKeys,
4892 boost::asio::io_context& io_svc,
4893 beast::Journal journal,
4894 beast::insight::Collector::ptr const& collector)
4895{
4897 app,
4898 clock,
4899 standalone,
4900 minPeerCount,
4901 startvalid,
4902 job_queue,
4904 validatorKeys,
4905 io_svc,
4906 journal,
4907 collector);
4908}
4909
4910} // namespace ripple
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:63
Represents a JSON value.
Definition json_value.h:149
Json::UInt UInt
Definition json_value.h:156
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
Stream warn() const
Definition Journal.h:340
A metric for measuring an integral value.
Definition Gauge.h:40
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:68
A reference to a handler for performing polled collection.
Definition Hook.h:32
A transaction that is in a closed ledger.
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
TxMeta const & getMeta() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual Config & config()=0
virtual Overlay & overlay()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual OpenLedger & openLedger()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual TimeKeeper & timeKeeper()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual JobQueue & getJobQueue()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual ValidatorList & validators()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual LedgerMaster & getLedgerMaster()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual ManifestCache & validatorManifests()=0
virtual TxQ & getTxQ()=0
virtual perf::PerfLog & getPerfLog()=0
virtual Cluster & cluster()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Specifies an order book.
Definition Book.h:36
Issue in
Definition Book.h:38
Issue out
Definition Book.h:39
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::string const & name() const
Definition ClusterNode.h:46
std::uint32_t getLoadFee() const
Definition ClusterNode.h:52
NetClock::time_point getReportTime() const
Definition ClusterNode.h:58
PublicKey const & identity() const
Definition ClusterNode.h:64
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:49
uint32_t NETWORK_ID
Definition Config.h:156
std::string SERVER_DOMAIN
Definition Config.h:278
std::size_t NODE_SIZE
Definition Config.h:213
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
Definition Config.h:160
int RELAY_UNTRUSTED_VALIDATIONS
Definition Config.h:169
virtual void clearFailures()=0
virtual Json::Value getInfo()=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:54
AccountID account
Definition Issue.h:36
Currency currency
Definition Issue.h:35
A pool of threads to perform work.
Definition JobQueue.h:58
Json::Value getJson(int c=0)
Definition JobQueue.cpp:214
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:168
std::shared_ptr< Ledger const > getValidatedLedger()
bool haveValidated()
Whether we have ever fully validated a ledger.
std::shared_ptr< ReadView const > getCurrentLedger()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::size_t getFetchPackCacheSize() const
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
std::chrono::seconds getValidatedLedgerAge()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getLoadBase() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
Manages load sources.
Definition LoadManager.h:46
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition Manifest.cpp:323
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void mode(OperatingMode om)
Record state transition.
void json(Json::Value &obj) const
Output state counters in JSON format.
std::array< Counters, 5 > counters_
std::chrono::steady_clock::time_point start_
static std::array< Json::StaticString const, 5 > const states_
std::chrono::steady_clock::time_point const processStart_
Transaction with input flags and results to be applied in batches.
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::shared_ptr< Transaction > const transaction
boost::asio::steady_timer accountHistoryTxTimer_
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
OperatingMode getOperatingMode() const override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
std::vector< TransactionStatus > mTransactions
bool unsubBookChanges(std::uint64_t uListener) override
std::atomic< OperatingMode > mMode
Json::Value getLedgerFetchInfo() override
bool isUNLBlocked() override
RCLConsensus mConsensus
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
void setNeedNetworkLedger() override
void setUNLBlocked() override
void pubConsensus(ConsensusPhase phase)
void transactionBatch()
Apply transactions in batches.
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
void setAmendmentBlocked() override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void clearUNLBlocked() override
boost::asio::steady_timer heartbeatTimer_
void updateLocalTx(ReadView const &view) override
bool unsubManifests(std::uint64_t uListener) override
DispatchState
Synchronization states for transaction batches.
std::optional< PublicKey > const validatorPK_
bool unsubTransactions(std::uint64_t uListener) override
void clearAmendmentWarned() override
std::size_t getLocalTxCount() override
std::unique_ptr< LocalTxs > m_localTX
bool subValidations(InfoSub::ref ispListener) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isAmendmentBlocked() override
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
SubAccountHistoryMapType mSubAccountHistory
Json::Value getServerInfo(bool human, bool admin, bool counters) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
boost::asio::steady_timer clusterTimer_
bool isAmendmentWarned() override
static std::array< char const *, 5 > const states_
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
std::atomic< bool > amendmentBlocked_
beast::Journal m_journal
SubInfoMapType mSubAccount
std::optional< PublicKey > const validatorMasterPK_
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
std::set< uint256 > pendingValidations_
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool unsubValidations(std::uint64_t uListener) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
bool unsubPeerStatus(std::uint64_t uListener) override
void pubValidation(std::shared_ptr< STValidation > const &val) override
std::size_t const minPeerCount_
std::atomic< bool > unlBlocked_
bool subBook(InfoSub::ref ispListener, Book const &) override
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
void stateAccounting(Json::Value &obj) override
void submitTransaction(std::shared_ptr< STTx const > const &) override
bool unsubRTTransactions(std::uint64_t uListener) override
Json::Value getConsensusInfo() override
std::recursive_mutex mSubLock
std::atomic< bool > needNetworkLedger_
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
StateAccounting accounting_
void reportConsensusStateChange(ConsensusPhase phase)
bool subConsensus(InfoSub::ref ispListener) override
bool isNeedNetworkLedger() override
void setAmendmentWarned() override
bool processTrustedProposal(RCLCxPeerPos proposal) override
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
bool subPeerStatus(InfoSub::ref ispListener) override
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool tryRemoveRpcSub(std::string const &strUrl) override
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
LedgerMaster & m_ledgerMaster
void clearLedgerFetch() override
bool isBlocked() override
void consensusViewChange() override
void setStateTimer() override
Called to initially start our timers.
bool subManifests(InfoSub::ref ispListener) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool unsubServer(std::uint64_t uListener) override
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
ServerFeeSummary mLastFeeSummary
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void setStandAlone() override
bool subRTTransactions(InfoSub::ref ispListener) override
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
std::condition_variable mCond
void setMode(OperatingMode om) override
void stop() override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
void clearNeedNetworkLedger() override
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
DispatchState mDispatchState
bool subBookChanges(InfoSub::ref ispListener) override
SubInfoMapType mSubRTAccount
void reportFeeChange() override
bool unsubBook(std::uint64_t uListener, Book const &) override
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
bool isFull() override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
std::mutex validationsMutex_
void pubManifest(Manifest const &) override
ConsensusPhase mLastConsensusPhase
bool subTransactions(InfoSub::ref ispListener) override
subRpcMapType mRpcSubMap
std::atomic< bool > amendmentWarned_
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool unsubLedger(std::uint64_t uListener) override
std::string getHostId(bool forAdmin)
bool unsubConsensus(std::uint64_t uListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:89
void getCountsJson(Json::Value &obj)
Definition Database.cpp:267
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:65
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::uint64_t getPeerDisconnectCharges() const =0
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
Json::Value getJson(bool full) const
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:63
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:507
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:51
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
void setJson(Json::Value &) const
Definition STAmount.cpp:643
std::string getText() const override
Definition STAmount.cpp:683
Issue const & issue() const
Definition STAmount.h:496
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
Definition Serializer.h:72
void const * data() const noexcept
Definition Serializer.h:78
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:64
std::chrono::seconds closeOffset() const
Definition TimeKeeper.h:83
time_point closeTime() const
Returns the predicted close time, in network time.
Definition TimeKeeper.h:76
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Definition TxQ.cpp:1776
static time_point now()
Validator keys and manifest as set in configuration file.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t quorum() const
Get quorum value for current trusted key set.
constexpr double decimalXRP() const
Definition XRPAmount.h:262
Json::Value jsonClipped() const
Definition XRPAmount.h:218
iterator begin()
Definition base_uint.h:136
static constexpr std::size_t size()
Definition base_uint.h:526
bool isZero() const
Definition base_uint.h:540
bool isNonZero() const
Definition base_uint.h:545
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:231
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:34
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:68
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:53
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:47
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:184
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:380
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:274
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:69
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:93
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:861
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
Definition View.cpp:554
@ fhZERO_IF_FROZEN
Definition View.h:77
@ fhIGNORE_FREEZE
Definition View.h:77
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:149
@ rpcSUCCESS
Definition ErrorCodes.h:44
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:84
@ rpcINTERNAL
Definition ErrorCodes.h:130
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
auto constexpr muldiv_max
Definition mulDiv.h:28
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:192
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:984
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:175
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:173
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:174
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:68
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:53
AccountID calcAccountID(PublicKey const &pk)
@ current
This was a new validation and was added.
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
Json::Value rpcError(int iError)
Definition RPCErr.cpp:31
@ tefPAST_SEQ
Definition TER.h:175
bool isTefFailure(TER x) noexcept
Definition TER.h:666
ConsensusPhase
Phases of consensus for a single ledger round.
static std::array< char const *, 5 > const stateNames
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:30
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
Definition View.cpp:762
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:177
bool isTerRetry(TER x) noexcept
Definition TER.h:672
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:75
@ tesSUCCESS
Definition TER.h:245
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:141
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j)
Definition View.cpp:387
bool isTesSuccess(TER x) noexcept
Definition TER.h:678
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:69
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:92
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
Definition View.cpp:145
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:630
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1129
bool isTemMalformed(TER x) noexcept
Definition TER.h:660
Number root(Number f, unsigned d)
Definition Number.cpp:636
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
@ tapFAIL_HARD
Definition ApplyView.h:35
@ tapUNLIMITED
Definition ApplyView.h:42
@ tapNONE
Definition ApplyView.h:31
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
Definition View.cpp:156
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:44
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:76
constexpr std::size_t maxPoppedTransactions
@ terQUEUED
Definition TER.h:225
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:249
@ jtNETOP_CLUSTER
Definition Job.h:75
@ jtCLIENT_FEE_CHANGE
Definition Job.h:47
@ jtTRANSACTION
Definition Job.h:62
@ jtTXN_PROC
Definition Job.h:82
@ jtCLIENT_CONSENSUS
Definition Job.h:48
@ jtBATCH
Definition Job.h:65
@ jtCLIENT_ACCT_HIST
Definition Job.h:49
bool isTelLocal(TER x) noexcept
Definition TER.h:654
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:115
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:61
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Definition View.cpp:182
static std::uint32_t trunc32(std::uint64_t v)
@ temINVALID_FLAG
Definition TER.h:111
@ temBAD_SIGNATURE
Definition TER.h:105
static auto const genesisAccountId
STL namespace.
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
std::string serialized
The manifest in serialized form.
Definition Manifest.h:83
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:95
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:98
std::optional< Blob > getSignature() const
Returns manifest signature.
Definition Manifest.cpp:244
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:92
Blob getMasterSignature() const
Returns manifest master key signature.
Definition Manifest.cpp:255
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:86
Server fees published on server subscription.
bool operator!=(ServerFeeSummary const &b) const
std::optional< TxQ::Metrics > em
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge full_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Hook hook
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_transitions
beast::insight::Gauge full_duration
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:40
Data format for exchanging consumption information across peers.
Definition Gossip.h:32
std::vector< Item > items
Definition Gossip.h:44
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:165
IsMemberResult isMember(char const *key) const
void set(char const *key, auto const &v)
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:137
Sends a message to all peers.
Definition predicates.h:32
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)