rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/AcceptedLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/LedgerToJson.h>
7#include <xrpld/app/ledger/LocalTxs.h>
8#include <xrpld/app/ledger/OpenLedger.h>
9#include <xrpld/app/ledger/OrderBookDB.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/AmendmentTable.h>
14#include <xrpld/app/misc/DeliverMax.h>
15#include <xrpld/app/misc/HashRouter.h>
16#include <xrpld/app/misc/LoadFeeTrack.h>
17#include <xrpld/app/misc/NetworkOPs.h>
18#include <xrpld/app/misc/Transaction.h>
19#include <xrpld/app/misc/TxQ.h>
20#include <xrpld/app/misc/ValidatorKeys.h>
21#include <xrpld/app/misc/ValidatorList.h>
22#include <xrpld/app/misc/detail/AccountTxPaging.h>
23#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
24#include <xrpld/app/tx/apply.h>
25#include <xrpld/consensus/Consensus.h>
26#include <xrpld/consensus/ConsensusParms.h>
27#include <xrpld/overlay/Cluster.h>
28#include <xrpld/overlay/Overlay.h>
29#include <xrpld/overlay/predicates.h>
30#include <xrpld/rpc/BookChanges.h>
31#include <xrpld/rpc/CTID.h>
32#include <xrpld/rpc/DeliveredAmount.h>
33#include <xrpld/rpc/MPTokenIssuanceID.h>
34#include <xrpld/rpc/ServerHandler.h>
35
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/mulDiv.h>
38#include <xrpl/basics/safe_cast.h>
39#include <xrpl/basics/scope.h>
40#include <xrpl/beast/utility/rngfill.h>
41#include <xrpl/core/PerfLog.h>
42#include <xrpl/crypto/RFC1751.h>
43#include <xrpl/crypto/csprng.h>
44#include <xrpl/protocol/BuildInfo.h>
45#include <xrpl/protocol/Feature.h>
46#include <xrpl/protocol/MultiApiJson.h>
47#include <xrpl/protocol/NFTSyntheticSerializer.h>
48#include <xrpl/protocol/RPCErr.h>
49#include <xrpl/protocol/TxFlags.h>
50#include <xrpl/protocol/jss.h>
51#include <xrpl/resource/Fees.h>
52#include <xrpl/resource/ResourceManager.h>
53
54#include <boost/asio/ip/host_name.hpp>
55#include <boost/asio/steady_timer.hpp>
56
57#include <algorithm>
58#include <exception>
59#include <mutex>
60#include <optional>
61#include <set>
62#include <sstream>
63#include <string>
64#include <tuple>
65#include <unordered_map>
66
67namespace xrpl {
68
69class NetworkOPsImp final : public NetworkOPs
70{
76 {
77 public:
79 bool const admin;
80 bool const local;
82 bool applied = false;
84
87 bool a,
88 bool l,
89 FailHard f)
90 : transaction(t), admin(a), local(l), failType(f)
91 {
92 XRPL_ASSERT(
94 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
95 "valid inputs");
96 }
97 };
98
102 enum class DispatchState : unsigned char {
103 none,
104 scheduled,
105 running,
106 };
107
109
125 {
133
137 std::chrono::steady_clock::time_point start_ =
139 std::chrono::steady_clock::time_point const processStart_ = start_;
142
143 public:
145 {
147 .transitions = 1;
148 }
149
156 void
158
164 void
165 json(Json::Value& obj) const;
166
168 {
170 decltype(mode_) mode;
171 decltype(start_) start;
173 };
174
177 {
180 }
181 };
182
185 {
186 ServerFeeSummary() = default;
187
189 XRPAmount fee,
190 TxQ::Metrics&& escalationMetrics,
191 LoadFeeTrack const& loadFeeTrack);
192 bool
193 operator!=(ServerFeeSummary const& b) const;
194
195 bool
197 {
198 return !(*this != b);
199 }
200
205 };
206
207public:
209 Application& app,
211 bool standalone,
212 std::size_t minPeerCount,
213 bool start_valid,
214 JobQueue& job_queue,
216 ValidatorKeys const& validatorKeys,
217 boost::asio::io_context& io_svc,
218 beast::Journal journal,
219 beast::insight::Collector::ptr const& collector)
220 : app_(app)
221 , m_journal(journal)
224 , heartbeatTimer_(io_svc)
225 , clusterTimer_(io_svc)
226 , accountHistoryTxTimer_(io_svc)
227 , mConsensus(
228 app,
230 setup_FeeVote(app_.config().section("voting")),
231 app_.logs().journal("FeeVote")),
233 *m_localTX,
234 app.getInboundTransactions(),
235 beast::get_abstract_clock<std::chrono::steady_clock>(),
236 validatorKeys,
237 app_.logs().journal("LedgerConsensus"))
238 , validatorPK_(
239 validatorKeys.keys ? validatorKeys.keys->publicKey
240 : decltype(validatorPK_){})
242 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
243 : decltype(validatorMasterPK_){})
245 , m_job_queue(job_queue)
246 , m_standalone(standalone)
247 , minPeerCount_(start_valid ? 0 : minPeerCount)
248 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
249 {
250 }
251
252 ~NetworkOPsImp() override
253 {
254 // This clear() is necessary to ensure the shared_ptrs in this map get
255 // destroyed NOW because the objects in this map invoke methods on this
256 // class when they are destroyed
258 }
259
260public:
262 getOperatingMode() const override;
263
265 strOperatingMode(OperatingMode const mode, bool const admin) const override;
266
268 strOperatingMode(bool const admin = false) const override;
269
270 //
271 // Transaction operations.
272 //
273
274 // Must complete immediately.
275 void
277
278 void
280 std::shared_ptr<Transaction>& transaction,
281 bool bUnlimited,
282 bool bLocal,
283 FailHard failType) override;
284
285 void
286 processTransactionSet(CanonicalTXSet const& set) override;
287
296 void
299 bool bUnlimited,
300 FailHard failType);
301
311 void
314 bool bUnlimited,
315 FailHard failtype);
316
317private:
318 bool
320
321 void
324 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
325
326public:
330 void
332
338 void
340
341 //
342 // Owner functions.
343 //
344
348 AccountID const& account) override;
349
350 //
351 // Book functions.
352 //
353
354 void
357 Book const&,
358 AccountID const& uTakerID,
359 bool const bProof,
360 unsigned int iLimit,
361 Json::Value const& jvMarker,
362 Json::Value& jvResult) override;
363
364 // Ledger proposal/close functions.
365 bool
367
368 bool
371 std::string const& source) override;
372
373 void
374 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
375
376 // Network state machine.
377
378 // Used for the "jump" case.
379private:
380 void
382 bool
384
385public:
386 bool
388 uint256 const& networkClosed,
389 std::unique_ptr<std::stringstream> const& clog) override;
390 void
392 void
393 setStandAlone() override;
394
398 void
399 setStateTimer() override;
400
401 void
402 setNeedNetworkLedger() override;
403 void
404 clearNeedNetworkLedger() override;
405 bool
406 isNeedNetworkLedger() override;
407 bool
408 isFull() override;
409
410 void
411 setMode(OperatingMode om) override;
412
413 bool
414 isBlocked() override;
415 bool
416 isAmendmentBlocked() override;
417 void
418 setAmendmentBlocked() override;
419 bool
420 isAmendmentWarned() override;
421 void
422 setAmendmentWarned() override;
423 void
424 clearAmendmentWarned() override;
425 bool
426 isUNLBlocked() override;
427 void
428 setUNLBlocked() override;
429 void
430 clearUNLBlocked() override;
431 void
432 consensusViewChange() override;
433
435 getConsensusInfo() override;
437 getServerInfo(bool human, bool admin, bool counters) override;
438 void
439 clearLedgerFetch() override;
441 getLedgerFetchInfo() override;
444 std::optional<std::chrono::milliseconds> consensusDelay) override;
445 void
446 reportFeeChange() override;
447 void
449
450 void
451 updateLocalTx(ReadView const& view) override;
453 getLocalTxCount() override;
454
455 //
456 // Monitoring: publisher side.
457 //
458 void
459 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
460 void
463 std::shared_ptr<STTx const> const& transaction,
464 TER result) override;
465 void
466 pubValidation(std::shared_ptr<STValidation> const& val) override;
467
468 //--------------------------------------------------------------------------
469 //
470 // InfoSub::Source.
471 //
472 void
474 InfoSub::ref ispListener,
475 hash_set<AccountID> const& vnaAccountIDs,
476 bool rt) override;
477 void
479 InfoSub::ref ispListener,
480 hash_set<AccountID> const& vnaAccountIDs,
481 bool rt) override;
482
483 // Just remove the subscription from the tracking
484 // not from the InfoSub. Needed for InfoSub destruction
485 void
487 std::uint64_t seq,
488 hash_set<AccountID> const& vnaAccountIDs,
489 bool rt) override;
490
492 subAccountHistory(InfoSub::ref ispListener, AccountID const& account)
493 override;
494 void
496 InfoSub::ref ispListener,
497 AccountID const& account,
498 bool historyOnly) override;
499
500 void
502 std::uint64_t seq,
503 AccountID const& account,
504 bool historyOnly) override;
505
506 bool
507 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
508 bool
509 unsubLedger(std::uint64_t uListener) override;
510
511 bool
512 subBookChanges(InfoSub::ref ispListener) override;
513 bool
514 unsubBookChanges(std::uint64_t uListener) override;
515
516 bool
517 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin)
518 override;
519 bool
520 unsubServer(std::uint64_t uListener) override;
521
522 bool
523 subBook(InfoSub::ref ispListener, Book const&) override;
524 bool
525 unsubBook(std::uint64_t uListener, Book const&) override;
526
527 bool
528 subManifests(InfoSub::ref ispListener) override;
529 bool
530 unsubManifests(std::uint64_t uListener) override;
531 void
532 pubManifest(Manifest const&) override;
533
534 bool
535 subTransactions(InfoSub::ref ispListener) override;
536 bool
537 unsubTransactions(std::uint64_t uListener) override;
538
539 bool
540 subRTTransactions(InfoSub::ref ispListener) override;
541 bool
542 unsubRTTransactions(std::uint64_t uListener) override;
543
544 bool
545 subValidations(InfoSub::ref ispListener) override;
546 bool
547 unsubValidations(std::uint64_t uListener) override;
548
549 bool
550 subPeerStatus(InfoSub::ref ispListener) override;
551 bool
552 unsubPeerStatus(std::uint64_t uListener) override;
553 void
554 pubPeerStatus(std::function<Json::Value(void)> const&) override;
555
556 bool
557 subConsensus(InfoSub::ref ispListener) override;
558 bool
559 unsubConsensus(std::uint64_t uListener) override;
560
562 findRpcSub(std::string const& strUrl) override;
564 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
565 bool
566 tryRemoveRpcSub(std::string const& strUrl) override;
567
568 void
569 stop() override
570 {
571 {
572 try
573 {
574 heartbeatTimer_.cancel();
575 }
576 catch (boost::system::system_error const& e)
577 {
578 JLOG(m_journal.error())
579 << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
580 }
581
582 try
583 {
584 clusterTimer_.cancel();
585 }
586 catch (boost::system::system_error const& e)
587 {
588 JLOG(m_journal.error())
589 << "NetworkOPs: clusterTimer cancel error: " << e.what();
590 }
591
592 try
593 {
594 accountHistoryTxTimer_.cancel();
595 }
596 catch (boost::system::system_error const& e)
597 {
598 JLOG(m_journal.error())
599 << "NetworkOPs: accountHistoryTxTimer cancel error: "
600 << e.what();
601 }
602 }
603 // Make sure that any waitHandlers pending in our timers are done.
604 using namespace std::chrono_literals;
605 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
606 }
607
608 void
609 stateAccounting(Json::Value& obj) override;
610
611private:
612 void
613 setTimer(
614 boost::asio::steady_timer& timer,
615 std::chrono::milliseconds const& expiry_time,
616 std::function<void()> onExpire,
617 std::function<void()> onError);
618 void
620 void
622 void
624 void
626
628 transJson(
629 std::shared_ptr<STTx const> const& transaction,
630 TER result,
631 bool validated,
634
635 void
638 AcceptedLedgerTx const& transaction,
639 bool last);
640
641 void
644 AcceptedLedgerTx const& transaction,
645 bool last);
646
647 void
650 std::shared_ptr<STTx const> const& transaction,
651 TER result);
652
653 void
654 pubServer();
655 void
657
659 getHostId(bool forAdmin);
660
661private:
665
666 /*
667 * With a validated ledger to separate history and future, the node
668 * streams historical txns with negative indexes starting from -1,
669 * and streams future txns starting from index 0.
670 * The SubAccountHistoryIndex struct maintains these indexes.
671 * It also has a flag stopHistorical_ for stopping streaming
672 * the historical txns.
673 */
710
714 void
718 void
720 void
722
725
727
729
731
736
738 boost::asio::steady_timer heartbeatTimer_;
739 boost::asio::steady_timer clusterTimer_;
740 boost::asio::steady_timer accountHistoryTxTimer_;
741
743
746
748
750
753
755
757
758 enum SubTypes {
759 sLedger, // Accepted ledgers.
760 sManifests, // Received validator manifests.
761 sServer, // When server changes connectivity state.
762 sTransactions, // All accepted transactions.
763 sRTTransactions, // All proposed and accepted transactions.
764 sValidations, // Received validations.
765 sPeerStatus, // Peer status changes.
766 sConsensusPhase, // Consensus phase
767 sBookChanges, // Per-ledger order book changes
768 sLastEntry // Any new entry must be ADDED ABOVE this one
769 };
770
772
774
776
777 // Whether we are in standalone mode.
778 bool const m_standalone;
779
780 // The number of nodes that we need to consider ourselves connected.
782
783 // Transaction batching.
788
790
793
794private:
795 struct Stats
796 {
797 template <class Handler>
799 Handler const& handler,
800 beast::insight::Collector::ptr const& collector)
801 : hook(collector->make_hook(handler))
802 , disconnected_duration(collector->make_gauge(
803 "State_Accounting",
804 "Disconnected_duration"))
805 , connected_duration(collector->make_gauge(
806 "State_Accounting",
807 "Connected_duration"))
809 collector->make_gauge("State_Accounting", "Syncing_duration"))
810 , tracking_duration(collector->make_gauge(
811 "State_Accounting",
812 "Tracking_duration"))
814 collector->make_gauge("State_Accounting", "Full_duration"))
815 , disconnected_transitions(collector->make_gauge(
816 "State_Accounting",
817 "Disconnected_transitions"))
818 , connected_transitions(collector->make_gauge(
819 "State_Accounting",
820 "Connected_transitions"))
821 , syncing_transitions(collector->make_gauge(
822 "State_Accounting",
823 "Syncing_transitions"))
824 , tracking_transitions(collector->make_gauge(
825 "State_Accounting",
826 "Tracking_transitions"))
828 collector->make_gauge("State_Accounting", "Full_transitions"))
829 {
830 }
831
838
844 };
845
846 std::mutex m_statsMutex; // Mutex to lock m_stats
848
849private:
850 void
852};
853
854//------------------------------------------------------------------------------
855
857 {"disconnected", "connected", "syncing", "tracking", "full"}};
858
860
868
869static auto const genesisAccountId = calcAccountID(
871 .first);
872
873//------------------------------------------------------------------------------
874inline OperatingMode
876{
877 return mMode;
878}
879
880inline std::string
881NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
882{
883 return strOperatingMode(mMode, admin);
884}
885
886inline void
891
892inline void
897
898inline void
903
904inline bool
909
910inline bool
915
918{
919 static std::string const hostname = boost::asio::ip::host_name();
920
921 if (forAdmin)
922 return hostname;
923
924 // For non-admin uses hash the node public key into a
925 // single RFC1751 word:
926 static std::string const shroudedHostId = [this]() {
927 auto const& id = app_.nodeIdentity();
928
929 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
930 }();
931
932 return shroudedHostId;
933}
934
935void
937{
939
940 // Only do this work if a cluster is configured
941 if (app_.cluster().size() != 0)
943}
944
945void
947 boost::asio::steady_timer& timer,
948 std::chrono::milliseconds const& expiry_time,
949 std::function<void()> onExpire,
950 std::function<void()> onError)
951{
952 // Only start the timer if waitHandlerCounter_ is not yet joined.
953 if (auto optionalCountedHandler = waitHandlerCounter_.wrap(
954 [this, onExpire, onError](boost::system::error_code const& e) {
955 if ((e.value() == boost::system::errc::success) &&
956 (!m_job_queue.isStopped()))
957 {
958 onExpire();
959 }
960 // Recover as best we can if an unexpected error occurs.
961 if (e.value() != boost::system::errc::success &&
962 e.value() != boost::asio::error::operation_aborted)
963 {
964 // Try again later and hope for the best.
965 JLOG(m_journal.error())
966 << "Timer got error '" << e.message()
967 << "'. Restarting timer.";
968 onError();
969 }
970 }))
971 {
972 timer.expires_after(expiry_time);
973 timer.async_wait(std::move(*optionalCountedHandler));
974 }
975}
976
977void
978NetworkOPsImp::setHeartbeatTimer()
979{
980 setTimer(
981 heartbeatTimer_,
982 mConsensus.parms().ledgerGRANULARITY,
983 [this]() {
984 m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
985 processHeartbeatTimer();
986 });
987 },
988 [this]() { setHeartbeatTimer(); });
989}
990
991void
992NetworkOPsImp::setClusterTimer()
993{
994 using namespace std::chrono_literals;
995
996 setTimer(
997 clusterTimer_,
998 10s,
999 [this]() {
1000 m_job_queue.addJob(jtNETOP_CLUSTER, "NetOPs.cluster", [this]() {
1001 processClusterTimer();
1002 });
1003 },
1004 [this]() { setClusterTimer(); });
1005}
1006
1007void
1008NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
1009{
1010 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
1011 << toBase58(subInfo.index_->accountId_);
1012 using namespace std::chrono_literals;
1013 setTimer(
1014 accountHistoryTxTimer_,
1015 4s,
1016 [this, subInfo]() { addAccountHistoryJob(subInfo); },
1017 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1018}
1019
1020void
1021NetworkOPsImp::processHeartbeatTimer()
1022{
1023 RclConsensusLogger clog(
1024 "Heartbeat Timer", mConsensus.validating(), m_journal);
1025 {
1026 std::unique_lock lock{app_.getMasterMutex()};
1027
1028 // VFALCO NOTE This is for diagnosing a crash on exit
1029 LoadManager& mgr(app_.getLoadManager());
1030 mgr.heartbeat();
1031
1032 std::size_t const numPeers = app_.overlay().size();
1033
1034 // do we have sufficient peers? If not, we are disconnected.
1035 if (numPeers < minPeerCount_)
1036 {
1037 if (mMode != OperatingMode::DISCONNECTED)
1038 {
1039 setMode(OperatingMode::DISCONNECTED);
1041 ss << "Node count (" << numPeers << ") has fallen "
1042 << "below required minimum (" << minPeerCount_ << ").";
1043 JLOG(m_journal.warn()) << ss.str();
1044 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
1045 }
1046 else
1047 {
1048 CLOG(clog.ss())
1049 << "already DISCONNECTED. too few peers (" << numPeers
1050 << "), need at least " << minPeerCount_;
1051 }
1052
1053 // MasterMutex lock need not be held to call setHeartbeatTimer()
1054 lock.unlock();
1055 // We do not call mConsensus.timerEntry until there are enough
1056 // peers providing meaningful inputs to consensus
1057 setHeartbeatTimer();
1058
1059 return;
1060 }
1061
1062 if (mMode == OperatingMode::DISCONNECTED)
1063 {
1064 setMode(OperatingMode::CONNECTED);
1065 JLOG(m_journal.info())
1066 << "Node count (" << numPeers << ") is sufficient.";
1067 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
1068 << " peers. ";
1069 }
1070
1071 // Check if the last validated ledger forces a change between these
1072 // states.
1073 auto origMode = mMode.load();
1074 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
1075 if (mMode == OperatingMode::SYNCING)
1076 setMode(OperatingMode::SYNCING);
1077 else if (mMode == OperatingMode::CONNECTED)
1078 setMode(OperatingMode::CONNECTED);
1079 auto newMode = mMode.load();
1080 if (origMode != newMode)
1081 {
1082 CLOG(clog.ss())
1083 << ", changing to " << strOperatingMode(newMode, true);
1084 }
1085 CLOG(clog.ss()) << ". ";
1086 }
1087
1088 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
1089
1090 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
1091 ConsensusPhase const currPhase = mConsensus.phase();
1092 if (mLastConsensusPhase != currPhase)
1093 {
1094 reportConsensusStateChange(currPhase);
1095 mLastConsensusPhase = currPhase;
1096 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1097 }
1098 CLOG(clog.ss()) << ". ";
1099
1100 setHeartbeatTimer();
1101}
1102
1103void
1104NetworkOPsImp::processClusterTimer()
1105{
1106 if (app_.cluster().size() == 0)
1107 return;
1108
1109 using namespace std::chrono_literals;
1110
1111 bool const update = app_.cluster().update(
1112 app_.nodeIdentity().first,
1113 "",
1114 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1115 ? app_.getFeeTrack().getLocalFee()
1116 : 0,
1117 app_.timeKeeper().now());
1118
1119 if (!update)
1120 {
1121 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1122 setClusterTimer();
1123 return;
1124 }
1125
1126 protocol::TMCluster cluster;
1127 app_.cluster().for_each([&cluster](ClusterNode const& node) {
1128 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1129 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1130 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1131 n.set_nodeload(node.getLoadFee());
1132 if (!node.name().empty())
1133 n.set_nodename(node.name());
1134 });
1135
1136 Resource::Gossip gossip = app_.getResourceManager().exportConsumers();
1137 for (auto& item : gossip.items)
1138 {
1139 protocol::TMLoadSource& node = *cluster.add_loadsources();
1140 node.set_name(to_string(item.address));
1141 node.set_cost(item.balance);
1142 }
1143 app_.overlay().foreach(send_if(
1144 std::make_shared<Message>(cluster, protocol::mtCLUSTER),
1145 peer_in_cluster()));
1146 setClusterTimer();
1147}
1148
1149//------------------------------------------------------------------------------
1150
1152NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
1153 const
1154{
1155 if (mode == OperatingMode::FULL && admin)
1156 {
1157 auto const consensusMode = mConsensus.mode();
1158 if (consensusMode != ConsensusMode::wrongLedger)
1159 {
1160 if (consensusMode == ConsensusMode::proposing)
1161 return "proposing";
1162
1163 if (mConsensus.validating())
1164 return "validating";
1165 }
1166 }
1167
1168 return states_[static_cast<std::size_t>(mode)];
1169}
1170
1171void
1172NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1173{
1174 if (isNeedNetworkLedger())
1175 {
1176 // Nothing we can do if we've never been in sync
1177 return;
1178 }
1179
1180 // Enforce Network bar for batch txn
1181 if (iTrans->isFlag(tfInnerBatchTxn) &&
1182 m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1183 {
1184 JLOG(m_journal.error())
1185 << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1186 return;
1187 }
1188
1189 // this is an asynchronous interface
1190 auto const trans = sterilize(*iTrans);
1191
1192 auto const txid = trans->getTransactionID();
1193 auto const flags = app_.getHashRouter().getFlags(txid);
1194
1195 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1196 {
1197 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1198 return;
1199 }
1200
1201 try
1202 {
1203 auto const [validity, reason] = checkValidity(
1204 app_.getHashRouter(),
1205 *trans,
1206 m_ledgerMaster.getValidatedRules(),
1207 app_.config());
1208
1209 if (validity != Validity::Valid)
1210 {
1211 JLOG(m_journal.warn())
1212 << "Submitted transaction invalid: " << reason;
1213 return;
1214 }
1215 }
1216 catch (std::exception const& ex)
1217 {
1218 JLOG(m_journal.warn())
1219 << "Exception checking transaction " << txid << ": " << ex.what();
1220
1221 return;
1222 }
1223
1224 std::string reason;
1225
1226 auto tx = std::make_shared<Transaction>(trans, reason, app_);
1227
1228 m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() {
1229 auto t = tx;
1230 processTransaction(t, false, false, FailHard::no);
1231 });
1232}
1233
1234bool
1235NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1236{
1237 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1238
1239 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1240 {
1241 // cached bad
1242 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1243 transaction->setStatus(INVALID);
1244 transaction->setResult(temBAD_SIGNATURE);
1245 return false;
1246 }
1247
1248 auto const view = m_ledgerMaster.getCurrentLedger();
1249
1250 // This function is called by several different parts of the codebase
1251 // under no circumstances will we ever accept an inner txn within a batch
1252 // txn from the network.
1253 auto const sttx = *transaction->getSTransaction();
1254 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1255 {
1256 transaction->setStatus(INVALID);
1257 transaction->setResult(temINVALID_FLAG);
1258 app_.getHashRouter().setFlags(
1259 transaction->getID(), HashRouterFlags::BAD);
1260 return false;
1261 }
1262
1263 // NOTE ximinez - I think this check is redundant,
1264 // but I'm not 100% sure yet.
1265 // If so, only cost is looking up HashRouter flags.
1266 auto const [validity, reason] =
1267 checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1268 XRPL_ASSERT(
1269 validity == Validity::Valid,
1270 "xrpl::NetworkOPsImp::processTransaction : valid validity");
1271
1272 // Not concerned with local checks at this point.
1273 if (validity == Validity::SigBad)
1274 {
1275 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1276 transaction->setStatus(INVALID);
1277 transaction->setResult(temBAD_SIGNATURE);
1278 app_.getHashRouter().setFlags(
1279 transaction->getID(), HashRouterFlags::BAD);
1280 return false;
1281 }
1282
1283 // canonicalize can change our pointer
1284 app_.getMasterTransaction().canonicalize(&transaction);
1285
1286 return true;
1287}
1288
1289void
1290NetworkOPsImp::processTransaction(
1291 std::shared_ptr<Transaction>& transaction,
1292 bool bUnlimited,
1293 bool bLocal,
1294 FailHard failType)
1295{
1296 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1297
1298 // preProcessTransaction can change our pointer
1299 if (!preProcessTransaction(transaction))
1300 return;
1301
1302 if (bLocal)
1303 doTransactionSync(transaction, bUnlimited, failType);
1304 else
1305 doTransactionAsync(transaction, bUnlimited, failType);
1306}
1307
1308void
1309NetworkOPsImp::doTransactionAsync(
1310 std::shared_ptr<Transaction> transaction,
1311 bool bUnlimited,
1312 FailHard failType)
1313{
1314 std::lock_guard lock(mMutex);
1315
1316 if (transaction->getApplying())
1317 return;
1318
1319 mTransactions.push_back(
1320 TransactionStatus(transaction, bUnlimited, false, failType));
1321 transaction->setApplying();
1322
1323 if (mDispatchState == DispatchState::none)
1324 {
1325 if (m_job_queue.addJob(
1326 jtBATCH, "transactionBatch", [this]() { transactionBatch(); }))
1327 {
1328 mDispatchState = DispatchState::scheduled;
1329 }
1330 }
1331}
1332
1333void
1334NetworkOPsImp::doTransactionSync(
1335 std::shared_ptr<Transaction> transaction,
1336 bool bUnlimited,
1337 FailHard failType)
1338{
1339 std::unique_lock<std::mutex> lock(mMutex);
1340
1341 if (!transaction->getApplying())
1342 {
1343 mTransactions.push_back(
1344 TransactionStatus(transaction, bUnlimited, true, failType));
1345 transaction->setApplying();
1346 }
1347
1348 doTransactionSyncBatch(
1349 lock, [&transaction](std::unique_lock<std::mutex> const&) {
1350 return transaction->getApplying();
1351 });
1352}
1353
1354void
1355NetworkOPsImp::doTransactionSyncBatch(
1357 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1358{
1359 do
1360 {
1361 if (mDispatchState == DispatchState::running)
1362 {
1363 // A batch processing job is already running, so wait.
1364 mCond.wait(lock);
1365 }
1366 else
1367 {
1368 apply(lock);
1369
1370 if (mTransactions.size())
1371 {
1372 // More transactions need to be applied, but by another job.
1373 if (m_job_queue.addJob(jtBATCH, "transactionBatch", [this]() {
1374 transactionBatch();
1375 }))
1376 {
1377 mDispatchState = DispatchState::scheduled;
1378 }
1379 }
1380 }
1381 } while (retryCallback(lock));
1382}
1383
1384void
1385NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1386{
1387 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1389 candidates.reserve(set.size());
1390 for (auto const& [_, tx] : set)
1391 {
1392 std::string reason;
1393 auto transaction = std::make_shared<Transaction>(tx, reason, app_);
1394
1395 if (transaction->getStatus() == INVALID)
1396 {
1397 if (!reason.empty())
1398 {
1399 JLOG(m_journal.trace())
1400 << "Exception checking transaction: " << reason;
1401 }
1402 app_.getHashRouter().setFlags(
1403 tx->getTransactionID(), HashRouterFlags::BAD);
1404 continue;
1405 }
1406
1407 // preProcessTransaction can change our pointer
1408 if (!preProcessTransaction(transaction))
1409 continue;
1410
1411 candidates.emplace_back(transaction);
1412 }
1413
1414 std::vector<TransactionStatus> transactions;
1415 transactions.reserve(candidates.size());
1416
1417 std::unique_lock lock(mMutex);
1418
1419 for (auto& transaction : candidates)
1420 {
1421 if (!transaction->getApplying())
1422 {
1423 transactions.emplace_back(transaction, false, false, FailHard::no);
1424 transaction->setApplying();
1425 }
1426 }
1427
1428 if (mTransactions.empty())
1429 mTransactions.swap(transactions);
1430 else
1431 {
1432 mTransactions.reserve(mTransactions.size() + transactions.size());
1433 for (auto& t : transactions)
1434 mTransactions.push_back(std::move(t));
1435 }
1436 if (mTransactions.empty())
1437 {
1438 JLOG(m_journal.debug()) << "No transaction to process!";
1439 return;
1440 }
1441
1442 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1443 XRPL_ASSERT(
1444 lock.owns_lock(),
1445 "xrpl::NetworkOPsImp::processTransactionSet has lock");
1446 return std::any_of(
1447 mTransactions.begin(), mTransactions.end(), [](auto const& t) {
1448 return t.transaction->getApplying();
1449 });
1450 });
1451}
1452
1453void
1454NetworkOPsImp::transactionBatch()
1455{
1456 std::unique_lock<std::mutex> lock(mMutex);
1457
1458 if (mDispatchState == DispatchState::running)
1459 return;
1460
1461 while (mTransactions.size())
1462 {
1463 apply(lock);
1464 }
1465}
1466
1467void
1468NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1469{
1471 std::vector<TransactionStatus> transactions;
1472 mTransactions.swap(transactions);
1473 XRPL_ASSERT(
1474 !transactions.empty(),
1475 "xrpl::NetworkOPsImp::apply : non-empty transactions");
1476 XRPL_ASSERT(
1477 mDispatchState != DispatchState::running,
1478 "xrpl::NetworkOPsImp::apply : is not running");
1479
1480 mDispatchState = DispatchState::running;
1481
1482 batchLock.unlock();
1483
1484 {
1485 std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
1486 bool changed = false;
1487 {
1488 std::unique_lock ledgerLock{
1489 m_ledgerMaster.peekMutex(), std::defer_lock};
1490 std::lock(masterLock, ledgerLock);
1491
1492 app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
1493 for (TransactionStatus& e : transactions)
1494 {
1495 // we check before adding to the batch
1496 ApplyFlags flags = tapNONE;
1497 if (e.admin)
1498 flags |= tapUNLIMITED;
1499
1500 if (e.failType == FailHard::yes)
1501 flags |= tapFAIL_HARD;
1502
1503 auto const result = app_.getTxQ().apply(
1504 app_, view, e.transaction->getSTransaction(), flags, j);
1505 e.result = result.ter;
1506 e.applied = result.applied;
1507 changed = changed || result.applied;
1508 }
1509 return changed;
1510 });
1511 }
1512 if (changed)
1513 reportFeeChange();
1514
1515 std::optional<LedgerIndex> validatedLedgerIndex;
1516 if (auto const l = m_ledgerMaster.getValidatedLedger())
1517 validatedLedgerIndex = l->header().seq;
1518
1519 auto newOL = app_.openLedger().current();
1520 for (TransactionStatus& e : transactions)
1521 {
1522 e.transaction->clearSubmitResult();
1523
1524 if (e.applied)
1525 {
1526 pubProposedTransaction(
1527 newOL, e.transaction->getSTransaction(), e.result);
1528 e.transaction->setApplied();
1529 }
1530
1531 e.transaction->setResult(e.result);
1532
1533 if (isTemMalformed(e.result))
1534 app_.getHashRouter().setFlags(
1535 e.transaction->getID(), HashRouterFlags::BAD);
1536
1537#ifdef DEBUG
1538 if (e.result != tesSUCCESS)
1539 {
1540 std::string token, human;
1541
1542 if (transResultInfo(e.result, token, human))
1543 {
1544 JLOG(m_journal.info())
1545 << "TransactionResult: " << token << ": " << human;
1546 }
1547 }
1548#endif
1549
1550 bool addLocal = e.local;
1551
1552 if (e.result == tesSUCCESS)
1553 {
1554 JLOG(m_journal.debug())
1555 << "Transaction is now included in open ledger";
1556 e.transaction->setStatus(INCLUDED);
1557
1558 // Pop as many "reasonable" transactions for this account as
1559 // possible. "Reasonable" means they have sequential sequence
1560 // numbers, or use tickets.
1561 auto const& txCur = e.transaction->getSTransaction();
1562
1563 std::size_t count = 0;
1564 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1565 txNext && count < maxPoppedTransactions;
1566 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1567 {
1568 if (!batchLock.owns_lock())
1569 batchLock.lock();
1570 std::string reason;
1571 auto const trans = sterilize(*txNext);
1572 auto t = std::make_shared<Transaction>(trans, reason, app_);
1573 if (t->getApplying())
1574 break;
1575 submit_held.emplace_back(t, false, false, FailHard::no);
1576 t->setApplying();
1577 }
1578 if (batchLock.owns_lock())
1579 batchLock.unlock();
1580 }
1581 else if (e.result == tefPAST_SEQ)
1582 {
1583 // duplicate or conflict
1584 JLOG(m_journal.info()) << "Transaction is obsolete";
1585 e.transaction->setStatus(OBSOLETE);
1586 }
1587 else if (e.result == terQUEUED)
1588 {
1589 JLOG(m_journal.debug())
1590 << "Transaction is likely to claim a"
1591 << " fee, but is queued until fee drops";
1592
1593 e.transaction->setStatus(HELD);
1594 // Add to held transactions, because it could get
1595 // kicked out of the queue, and this will try to
1596 // put it back.
1597 m_ledgerMaster.addHeldTransaction(e.transaction);
1598 e.transaction->setQueued();
1599 e.transaction->setKept();
1600 }
1601 else if (
1602 isTerRetry(e.result) || isTelLocal(e.result) ||
1603 isTefFailure(e.result))
1604 {
1605 if (e.failType != FailHard::yes)
1606 {
1607 auto const lastLedgerSeq =
1608 e.transaction->getSTransaction()->at(
1609 ~sfLastLedgerSequence);
1610 auto const ledgersLeft = lastLedgerSeq
1611 ? *lastLedgerSeq -
1612 m_ledgerMaster.getCurrentLedgerIndex()
1614 // If any of these conditions are met, the transaction can
1615 // be held:
1616 // 1. It was submitted locally. (Note that this flag is only
1617 // true on the initial submission.)
1618 // 2. The transaction has a LastLedgerSequence, and the
1619 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1620 // (5) ledgers into the future. (Remember that an
1621 // unseated optional compares as less than all seated
1622 // values, so it has to be checked explicitly first.)
1623 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1624 // (setFlags
1625 // checks before setting. If the flag is set, it returns
1626 // false, which means it's been held once without one of
1627 // the other conditions, so don't hold it again. Time's
1628 // up!)
1629 //
1630 if (e.local ||
1631 (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1632 app_.getHashRouter().setFlags(
1633 e.transaction->getID(), HashRouterFlags::HELD))
1634 {
1635 // transaction should be held
1636 JLOG(m_journal.debug())
1637 << "Transaction should be held: " << e.result;
1638 e.transaction->setStatus(HELD);
1639 m_ledgerMaster.addHeldTransaction(e.transaction);
1640 e.transaction->setKept();
1641 }
1642 else
1643 JLOG(m_journal.debug())
1644 << "Not holding transaction "
1645 << e.transaction->getID() << ": "
1646 << (e.local ? "local" : "network") << ", "
1647 << "result: " << e.result << " ledgers left: "
1648 << (ledgersLeft ? to_string(*ledgersLeft)
1649 : "unspecified");
1650 }
1651 }
1652 else
1653 {
1654 JLOG(m_journal.debug())
1655 << "Status other than success " << e.result;
1656 e.transaction->setStatus(INVALID);
1657 }
1658
1659 auto const enforceFailHard =
1660 e.failType == FailHard::yes && !isTesSuccess(e.result);
1661
1662 if (addLocal && !enforceFailHard)
1663 {
1664 m_localTX->push_back(
1665 m_ledgerMaster.getCurrentLedgerIndex(),
1666 e.transaction->getSTransaction());
1667 e.transaction->setKept();
1668 }
1669
1670 if ((e.applied ||
1671 ((mMode != OperatingMode::FULL) &&
1672 (e.failType != FailHard::yes) && e.local) ||
1673 (e.result == terQUEUED)) &&
1674 !enforceFailHard)
1675 {
1676 auto const toSkip =
1677 app_.getHashRouter().shouldRelay(e.transaction->getID());
1678 if (auto const sttx = *(e.transaction->getSTransaction());
1679 toSkip &&
1680 // Skip relaying if it's an inner batch txn. The flag should
1681 // only be set if the Batch feature is enabled. If Batch is
1682 // not enabled, the flag is always invalid, so don't relay
1683 // it regardless.
1684 !(sttx.isFlag(tfInnerBatchTxn)))
1685 {
1686 protocol::TMTransaction tx;
1687 Serializer s;
1688
1689 sttx.add(s);
1690 tx.set_rawtransaction(s.data(), s.size());
1691 tx.set_status(protocol::tsCURRENT);
1692 tx.set_receivetimestamp(
1693 app_.timeKeeper().now().time_since_epoch().count());
1694 tx.set_deferred(e.result == terQUEUED);
1695 // FIXME: This should be when we received it
1696 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1697 e.transaction->setBroadcast();
1698 }
1699 }
1700
1701 if (validatedLedgerIndex)
1702 {
1703 auto [fee, accountSeq, availableSeq] =
1704 app_.getTxQ().getTxRequiredFeeAndSeq(
1705 *newOL, e.transaction->getSTransaction());
1706 e.transaction->setCurrentLedgerState(
1707 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1708 }
1709 }
1710 }
1711
1712 batchLock.lock();
1713
1714 for (TransactionStatus& e : transactions)
1715 e.transaction->clearApplying();
1716
1717 if (!submit_held.empty())
1718 {
1719 if (mTransactions.empty())
1720 mTransactions.swap(submit_held);
1721 else
1722 {
1723 mTransactions.reserve(mTransactions.size() + submit_held.size());
1724 for (auto& e : submit_held)
1725 mTransactions.push_back(std::move(e));
1726 }
1727 }
1728
1729 mCond.notify_all();
1730
1731 mDispatchState = DispatchState::none;
1732}
1733
1734//
1735// Owner functions
1736//
1737
1739NetworkOPsImp::getOwnerInfo(
1741 AccountID const& account)
1742{
1743 Json::Value jvObjects(Json::objectValue);
1744 auto root = keylet::ownerDir(account);
1745 auto sleNode = lpLedger->read(keylet::page(root));
1746 if (sleNode)
1747 {
1748 std::uint64_t uNodeDir;
1749
1750 do
1751 {
1752 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1753 {
1754 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1755 XRPL_ASSERT(
1756 sleCur,
1757 "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1758
1759 switch (sleCur->getType())
1760 {
1761 case ltOFFER:
1762 if (!jvObjects.isMember(jss::offers))
1763 jvObjects[jss::offers] =
1765
1766 jvObjects[jss::offers].append(
1767 sleCur->getJson(JsonOptions::none));
1768 break;
1769
1770 case ltRIPPLE_STATE:
1771 if (!jvObjects.isMember(jss::ripple_lines))
1772 {
1773 jvObjects[jss::ripple_lines] =
1775 }
1776
1777 jvObjects[jss::ripple_lines].append(
1778 sleCur->getJson(JsonOptions::none));
1779 break;
1780
1781 case ltACCOUNT_ROOT:
1782 case ltDIR_NODE:
1783 // LCOV_EXCL_START
1784 default:
1785 UNREACHABLE(
1786 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1787 "type");
1788 break;
1789 // LCOV_EXCL_STOP
1790 }
1791 }
1792
1793 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1794
1795 if (uNodeDir)
1796 {
1797 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1798 XRPL_ASSERT(
1799 sleNode,
1800 "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1801 }
1802 } while (uNodeDir);
1803 }
1804
1805 return jvObjects;
1806}
1807
1808//
1809// Other
1810//
1811
1812inline bool
1813NetworkOPsImp::isBlocked()
1814{
1815 return isAmendmentBlocked() || isUNLBlocked();
1816}
1817
1818inline bool
1819NetworkOPsImp::isAmendmentBlocked()
1820{
1821 return amendmentBlocked_;
1822}
1823
1824void
1825NetworkOPsImp::setAmendmentBlocked()
1826{
1827 amendmentBlocked_ = true;
1828 setMode(OperatingMode::CONNECTED);
1829}
1830
1831inline bool
1832NetworkOPsImp::isAmendmentWarned()
1833{
1834 return !amendmentBlocked_ && amendmentWarned_;
1835}
1836
1837inline void
1838NetworkOPsImp::setAmendmentWarned()
1839{
1840 amendmentWarned_ = true;
1841}
1842
1843inline void
1844NetworkOPsImp::clearAmendmentWarned()
1845{
1846 amendmentWarned_ = false;
1847}
1848
1849inline bool
1850NetworkOPsImp::isUNLBlocked()
1851{
1852 return unlBlocked_;
1853}
1854
1855void
1856NetworkOPsImp::setUNLBlocked()
1857{
1858 unlBlocked_ = true;
1859 setMode(OperatingMode::CONNECTED);
1860}
1861
1862inline void
1863NetworkOPsImp::clearUNLBlocked()
1864{
1865 unlBlocked_ = false;
1866}
1867
1868bool
1869NetworkOPsImp::checkLastClosedLedger(
1870 Overlay::PeerSequence const& peerList,
1871 uint256& networkClosed)
1872{
1873 // Returns true if there's an *abnormal* ledger issue, normal changing in
1874 // TRACKING mode should return false. Do we have sufficient validations for
1875 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1876 // better ledger available? If so, we are either tracking or full.
1877
1878 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1879
1880 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1881
1882 if (!ourClosed)
1883 return false;
1884
1885 uint256 closedLedger = ourClosed->header().hash;
1886 uint256 prevClosedLedger = ourClosed->header().parentHash;
1887 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1888 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1889
1890 //-------------------------------------------------------------------------
1891 // Determine preferred last closed ledger
1892
1893 auto& validations = app_.getValidations();
1894 JLOG(m_journal.debug())
1895 << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1896
1897 // Will rely on peer LCL if no trusted validations exist
1899 peerCounts[closedLedger] = 0;
1900 if (mMode >= OperatingMode::TRACKING)
1901 peerCounts[closedLedger]++;
1902
1903 for (auto& peer : peerList)
1904 {
1905 uint256 peerLedger = peer->getClosedLedgerHash();
1906
1907 if (peerLedger.isNonZero())
1908 ++peerCounts[peerLedger];
1909 }
1910
1911 for (auto const& it : peerCounts)
1912 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1913
1914 uint256 preferredLCL = validations.getPreferredLCL(
1915 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1916 m_ledgerMaster.getValidLedgerIndex(),
1917 peerCounts);
1918
1919 bool switchLedgers = preferredLCL != closedLedger;
1920 if (switchLedgers)
1921 closedLedger = preferredLCL;
1922 //-------------------------------------------------------------------------
1923 if (switchLedgers && (closedLedger == prevClosedLedger))
1924 {
1925 // don't switch to our own previous ledger
1926 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1927 networkClosed = ourClosed->header().hash;
1928 switchLedgers = false;
1929 }
1930 else
1931 networkClosed = closedLedger;
1932
1933 if (!switchLedgers)
1934 return false;
1935
1936 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1937
1938 if (!consensus)
1939 consensus = app_.getInboundLedgers().acquire(
1940 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1941
1942 if (consensus &&
1943 (!m_ledgerMaster.canBeCurrent(consensus) ||
1944 !m_ledgerMaster.isCompatible(
1945 *consensus, m_journal.debug(), "Not switching")))
1946 {
1947 // Don't switch to a ledger not on the validated chain
1948 // or with an invalid close time or sequence
1949 networkClosed = ourClosed->header().hash;
1950 return false;
1951 }
1952
1953 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1954 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->header().hash
1955 << getJson({*ourClosed, {}});
1956 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1957
1958 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1959 {
1960 setMode(OperatingMode::CONNECTED);
1961 }
1962
1963 if (consensus)
1964 {
1965 // FIXME: If this rewinds the ledger sequence, or has the same
1966 // sequence, we should update the status on any stored transactions
1967 // in the invalidated ledgers.
1968 switchLastClosedLedger(consensus);
1969 }
1970
1971 return true;
1972}
1973
1974void
1975NetworkOPsImp::switchLastClosedLedger(
1976 std::shared_ptr<Ledger const> const& newLCL)
1977{
1978 // set the newLCL as our last closed ledger -- this is abnormal code
1979 JLOG(m_journal.error())
1980 << "JUMP last closed ledger to " << newLCL->header().hash;
1981
1982 clearNeedNetworkLedger();
1983
1984 // Update fee computations.
1985 app_.getTxQ().processClosedLedger(app_, *newLCL, true);
1986
1987 // Caller must own master lock
1988 {
1989 // Apply tx in old open ledger to new
1990 // open ledger. Then apply local tx.
1991
1992 auto retries = m_localTX->getTxSet();
1993 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
1995 if (lastVal)
1996 rules = makeRulesGivenLedger(*lastVal, app_.config().features);
1997 else
1998 rules.emplace(app_.config().features);
1999 app_.openLedger().accept(
2000 app_,
2001 *rules,
2002 newLCL,
2003 OrderedTxs({}),
2004 false,
2005 retries,
2006 tapNONE,
2007 "jump",
2008 [&](OpenView& view, beast::Journal j) {
2009 // Stuff the ledger with transactions from the queue.
2010 return app_.getTxQ().accept(app_, view);
2011 });
2012 }
2013
2014 m_ledgerMaster.switchLCL(newLCL);
2015
2016 protocol::TMStatusChange s;
2017 s.set_newevent(protocol::neSWITCHED_LEDGER);
2018 s.set_ledgerseq(newLCL->header().seq);
2019 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2020 s.set_ledgerhashprevious(
2021 newLCL->header().parentHash.begin(),
2022 newLCL->header().parentHash.size());
2023 s.set_ledgerhash(
2024 newLCL->header().hash.begin(), newLCL->header().hash.size());
2025
2026 app_.overlay().foreach(
2027 send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
2028}
2029
2030bool
2031NetworkOPsImp::beginConsensus(
2032 uint256 const& networkClosed,
2034{
2035 XRPL_ASSERT(
2036 networkClosed.isNonZero(),
2037 "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
2038
2039 auto closingInfo = m_ledgerMaster.getCurrentLedger()->header();
2040
2041 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq
2042 << " with LCL " << closingInfo.parentHash;
2043
2044 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
2045
2046 if (!prevLedger)
2047 {
2048 // this shouldn't happen unless we jump ledgers
2049 if (mMode == OperatingMode::FULL)
2050 {
2051 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
2052 setMode(OperatingMode::TRACKING);
2053 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
2054 }
2055
2056 CLOG(clog) << "beginConsensus no previous ledger. ";
2057 return false;
2058 }
2059
2060 XRPL_ASSERT(
2061 prevLedger->header().hash == closingInfo.parentHash,
2062 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2063 "parent");
2064 XRPL_ASSERT(
2065 closingInfo.parentHash ==
2066 m_ledgerMaster.getClosedLedger()->header().hash,
2067 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2068 "hash");
2069
2070 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
2071 TrustChanges const changes = app_.validators().updateTrusted(
2072 app_.getValidations().getCurrentNodeIDs(),
2073 closingInfo.parentCloseTime,
2074 *this,
2075 app_.overlay(),
2076 app_.getHashRouter());
2077
2078 if (!changes.added.empty() || !changes.removed.empty())
2079 {
2080 app_.getValidations().trustChanged(changes.added, changes.removed);
2081 // Update the AmendmentTable so it tracks the current validators.
2082 app_.getAmendmentTable().trustChanged(
2083 app_.validators().getQuorumKeys().second);
2084 }
2085
2086 mConsensus.startRound(
2087 app_.timeKeeper().closeTime(),
2088 networkClosed,
2089 prevLedger,
2090 changes.removed,
2091 changes.added,
2092 clog);
2093
2094 ConsensusPhase const currPhase = mConsensus.phase();
2095 if (mLastConsensusPhase != currPhase)
2096 {
2097 reportConsensusStateChange(currPhase);
2098 mLastConsensusPhase = currPhase;
2099 }
2100
2101 JLOG(m_journal.debug()) << "Initiating consensus engine";
2102 return true;
2103}
2104
2105bool
2106NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
2107{
2108 auto const& peerKey = peerPos.publicKey();
2109 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2110 {
2111 // Could indicate a operator misconfiguration where two nodes are
2112 // running with the same validator key configured, so this isn't fatal,
2113 // and it doesn't necessarily indicate peer misbehavior. But since this
2114 // is a trusted message, it could be a very big deal. Either way, we
2115 // don't want to relay the proposal. Note that the byzantine behavior
2116 // detection in handleNewValidation will notify other peers.
2117 //
2118 // Another, innocuous explanation is unusual message routing and delays,
2119 // causing this node to receive its own messages back.
2120 JLOG(m_journal.error())
2121 << "Received a proposal signed by MY KEY from a peer. This may "
2122 "indicate a misconfiguration where another node has the same "
2123 "validator key, or may be caused by unusual message routing and "
2124 "delays.";
2125 return false;
2126 }
2127
2128 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
2129}
2130
2131void
2132NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
2133{
2134 // We now have an additional transaction set
2135 // either created locally during the consensus process
2136 // or acquired from a peer
2137
2138 // Inform peers we have this set
2139 protocol::TMHaveTransactionSet msg;
2140 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2141 msg.set_status(protocol::tsHAVE);
2142 app_.overlay().foreach(
2143 send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
2144
2145 // We acquired it because consensus asked us to
2146 if (fromAcquire)
2147 mConsensus.gotTxSet(app_.timeKeeper().closeTime(), RCLTxSet{map});
2148}
2149
2150void
2151NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
2152{
2153 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->header().parentHash;
2154
2155 for (auto const& it : app_.overlay().getActivePeers())
2156 {
2157 if (it && (it->getClosedLedgerHash() == deadLedger))
2158 {
2159 JLOG(m_journal.trace()) << "Killing obsolete peer status";
2160 it->cycleStatus();
2161 }
2162 }
2163
2164 uint256 networkClosed;
2165 bool ledgerChange =
2166 checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
2167
2168 if (networkClosed.isZero())
2169 {
2170 CLOG(clog) << "endConsensus last closed ledger is zero. ";
2171 return;
2172 }
2173
2174 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
2175 // we must count how many nodes share our LCL, how many nodes disagree with
2176 // our LCL, and how many validations our LCL has. We also want to check
2177 // timing to make sure there shouldn't be a newer LCL. We need this
2178 // information to do the next three tests.
2179
2180 if (((mMode == OperatingMode::CONNECTED) ||
2181 (mMode == OperatingMode::SYNCING)) &&
2182 !ledgerChange)
2183 {
2184 // Count number of peers that agree with us and UNL nodes whose
2185 // validations we have for LCL. If the ledger is good enough, go to
2186 // TRACKING - TODO
2187 if (!needNetworkLedger_)
2188 setMode(OperatingMode::TRACKING);
2189 }
2190
2191 if (((mMode == OperatingMode::CONNECTED) ||
2192 (mMode == OperatingMode::TRACKING)) &&
2193 !ledgerChange)
2194 {
2195 // check if the ledger is good enough to go to FULL
2196 // Note: Do not go to FULL if we don't have the previous ledger
2197 // check if the ledger is bad enough to go to CONNECTED -- TODO
2198 auto current = m_ledgerMaster.getCurrentLedger();
2199 if (app_.timeKeeper().now() <
2200 (current->header().parentCloseTime +
2201 2 * current->header().closeTimeResolution))
2202 {
2203 setMode(OperatingMode::FULL);
2204 }
2205 }
2206
2207 beginConsensus(networkClosed, clog);
2208}
2209
2210void
2211NetworkOPsImp::consensusViewChange()
2212{
2213 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2214 {
2215 setMode(OperatingMode::CONNECTED);
2216 }
2217}
2218
2219void
2220NetworkOPsImp::pubManifest(Manifest const& mo)
2221{
2222 // VFALCO consider std::shared_mutex
2223 std::lock_guard sl(mSubLock);
2224
2225 if (!mStreamMaps[sManifests].empty())
2226 {
2228
2229 jvObj[jss::type] = "manifestReceived";
2230 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2231 if (mo.signingKey)
2232 jvObj[jss::signing_key] =
2233 toBase58(TokenType::NodePublic, *mo.signingKey);
2234 jvObj[jss::seq] = Json::UInt(mo.sequence);
2235 if (auto sig = mo.getSignature())
2236 jvObj[jss::signature] = strHex(*sig);
2237 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2238 if (!mo.domain.empty())
2239 jvObj[jss::domain] = mo.domain;
2240 jvObj[jss::manifest] = strHex(mo.serialized);
2241
2242 for (auto i = mStreamMaps[sManifests].begin();
2243 i != mStreamMaps[sManifests].end();)
2244 {
2245 if (auto p = i->second.lock())
2246 {
2247 p->send(jvObj, true);
2248 ++i;
2249 }
2250 else
2251 {
2252 i = mStreamMaps[sManifests].erase(i);
2253 }
2254 }
2255 }
2256}
2257
2258NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2259 XRPAmount fee,
2260 TxQ::Metrics&& escalationMetrics,
2261 LoadFeeTrack const& loadFeeTrack)
2262 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2263 , loadBaseServer{loadFeeTrack.getLoadBase()}
2264 , baseFee{fee}
2265 , em{std::move(escalationMetrics)}
2266{
2267}
2268
2269bool
2271 NetworkOPsImp::ServerFeeSummary const& b) const
2272{
2273 if (loadFactorServer != b.loadFactorServer ||
2274 loadBaseServer != b.loadBaseServer || baseFee != b.baseFee ||
2275 em.has_value() != b.em.has_value())
2276 return true;
2277
2278 if (em && b.em)
2279 {
2280 return (
2281 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2282 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel ||
2283 em->referenceFeeLevel != b.em->referenceFeeLevel);
2284 }
2285
2286 return false;
2287}
2288
2289// Need to cap to uint64 to uint32 due to JSON limitations
2290static std::uint32_t
2292{
2294
2295 return std::min(max32, v);
2296};
2297
2298void
2300{
2301 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2302 // list into a local array while holding the lock then release
2303 // the lock and call send on everyone.
2304 //
2306
2307 if (!mStreamMaps[sServer].empty())
2308 {
2310
2312 app_.openLedger().current()->fees().base,
2314 app_.getFeeTrack()};
2315
2316 jvObj[jss::type] = "serverStatus";
2317 jvObj[jss::server_status] = strOperatingMode();
2318 jvObj[jss::load_base] = f.loadBaseServer;
2319 jvObj[jss::load_factor_server] = f.loadFactorServer;
2320 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2321
2322 if (f.em)
2323 {
2324 auto const loadFactor = std::max(
2325 safe_cast<std::uint64_t>(f.loadFactorServer),
2326 mulDiv(
2327 f.em->openLedgerFeeLevel,
2328 f.loadBaseServer,
2329 f.em->referenceFeeLevel)
2331
2332 jvObj[jss::load_factor] = trunc32(loadFactor);
2333 jvObj[jss::load_factor_fee_escalation] =
2334 f.em->openLedgerFeeLevel.jsonClipped();
2335 jvObj[jss::load_factor_fee_queue] =
2336 f.em->minProcessingFeeLevel.jsonClipped();
2337 jvObj[jss::load_factor_fee_reference] =
2338 f.em->referenceFeeLevel.jsonClipped();
2339 }
2340 else
2341 jvObj[jss::load_factor] = f.loadFactorServer;
2342
2343 mLastFeeSummary = f;
2344
2345 for (auto i = mStreamMaps[sServer].begin();
2346 i != mStreamMaps[sServer].end();)
2347 {
2348 InfoSub::pointer p = i->second.lock();
2349
2350 // VFALCO TODO research the possibility of using thread queues and
2351 // linearizing the deletion of subscribers with the
2352 // sending of JSON data.
2353 if (p)
2354 {
2355 p->send(jvObj, true);
2356 ++i;
2357 }
2358 else
2359 {
2360 i = mStreamMaps[sServer].erase(i);
2361 }
2362 }
2363 }
2364}
2365
2366void
2368{
2370
2371 auto& streamMap = mStreamMaps[sConsensusPhase];
2372 if (!streamMap.empty())
2373 {
2375 jvObj[jss::type] = "consensusPhase";
2376 jvObj[jss::consensus] = to_string(phase);
2377
2378 for (auto i = streamMap.begin(); i != streamMap.end();)
2379 {
2380 if (auto p = i->second.lock())
2381 {
2382 p->send(jvObj, true);
2383 ++i;
2384 }
2385 else
2386 {
2387 i = streamMap.erase(i);
2388 }
2389 }
2390 }
2391}
2392
2393void
2395{
2396 // VFALCO consider std::shared_mutex
2398
2399 if (!mStreamMaps[sValidations].empty())
2400 {
2402
2403 auto const signerPublic = val->getSignerPublic();
2404
2405 jvObj[jss::type] = "validationReceived";
2406 jvObj[jss::validation_public_key] =
2407 toBase58(TokenType::NodePublic, signerPublic);
2408 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2409 jvObj[jss::signature] = strHex(val->getSignature());
2410 jvObj[jss::full] = val->isFull();
2411 jvObj[jss::flags] = val->getFlags();
2412 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2413 jvObj[jss::data] = strHex(val->getSerializer().slice());
2414 jvObj[jss::network_id] = app_.config().NETWORK_ID;
2415
2416 if (auto version = (*val)[~sfServerVersion])
2417 jvObj[jss::server_version] = std::to_string(*version);
2418
2419 if (auto cookie = (*val)[~sfCookie])
2420 jvObj[jss::cookie] = std::to_string(*cookie);
2421
2422 if (auto hash = (*val)[~sfValidatedHash])
2423 jvObj[jss::validated_hash] = strHex(*hash);
2424
2425 auto const masterKey =
2426 app_.validatorManifests().getMasterKey(signerPublic);
2427
2428 if (masterKey != signerPublic)
2429 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2430
2431 // NOTE *seq is a number, but old API versions used string. We replace
2432 // number with a string using MultiApiJson near end of this function
2433 if (auto const seq = (*val)[~sfLedgerSequence])
2434 jvObj[jss::ledger_index] = *seq;
2435
2436 if (val->isFieldPresent(sfAmendments))
2437 {
2438 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2439 for (auto const& amendment : val->getFieldV256(sfAmendments))
2440 jvObj[jss::amendments].append(to_string(amendment));
2441 }
2442
2443 if (auto const closeTime = (*val)[~sfCloseTime])
2444 jvObj[jss::close_time] = *closeTime;
2445
2446 if (auto const loadFee = (*val)[~sfLoadFee])
2447 jvObj[jss::load_fee] = *loadFee;
2448
2449 if (auto const baseFee = val->at(~sfBaseFee))
2450 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2451
2452 if (auto const reserveBase = val->at(~sfReserveBase))
2453 jvObj[jss::reserve_base] = *reserveBase;
2454
2455 if (auto const reserveInc = val->at(~sfReserveIncrement))
2456 jvObj[jss::reserve_inc] = *reserveInc;
2457
2458 // (The ~ operator converts the Proxy to a std::optional, which
2459 // simplifies later operations)
2460 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops);
2461 baseFeeXRP && baseFeeXRP->native())
2462 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2463
2464 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2465 reserveBaseXRP && reserveBaseXRP->native())
2466 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2467
2468 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2469 reserveIncXRP && reserveIncXRP->native())
2470 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2471
2472 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2473 // for consumers supporting different API versions
2474 MultiApiJson multiObj{jvObj};
2475 multiObj.visit(
2476 RPC::apiVersion<1>, //
2477 [](Json::Value& jvTx) {
2478 // Type conversion for older API versions to string
2479 if (jvTx.isMember(jss::ledger_index))
2480 {
2481 jvTx[jss::ledger_index] =
2482 std::to_string(jvTx[jss::ledger_index].asUInt());
2483 }
2484 });
2485
2486 for (auto i = mStreamMaps[sValidations].begin();
2487 i != mStreamMaps[sValidations].end();)
2488 {
2489 if (auto p = i->second.lock())
2490 {
2491 multiObj.visit(
2492 p->getApiVersion(), //
2493 [&](Json::Value const& jv) { p->send(jv, true); });
2494 ++i;
2495 }
2496 else
2497 {
2498 i = mStreamMaps[sValidations].erase(i);
2499 }
2500 }
2501 }
2502}
2503
2504void
2506{
2508
2509 if (!mStreamMaps[sPeerStatus].empty())
2510 {
2511 Json::Value jvObj(func());
2512
2513 jvObj[jss::type] = "peerStatusChange";
2514
2515 for (auto i = mStreamMaps[sPeerStatus].begin();
2516 i != mStreamMaps[sPeerStatus].end();)
2517 {
2518 InfoSub::pointer p = i->second.lock();
2519
2520 if (p)
2521 {
2522 p->send(jvObj, true);
2523 ++i;
2524 }
2525 else
2526 {
2527 i = mStreamMaps[sPeerStatus].erase(i);
2528 }
2529 }
2530 }
2531}
2532
2533void
2535{
2536 using namespace std::chrono_literals;
2537 if (om == OperatingMode::CONNECTED)
2538 {
2541 }
2542 else if (om == OperatingMode::SYNCING)
2543 {
2546 }
2547
2548 if ((om > OperatingMode::CONNECTED) && isBlocked())
2550
2551 if (mMode == om)
2552 return;
2553
2554 mMode = om;
2555
2556 accounting_.mode(om);
2557
2558 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2559 pubServer();
2560}
2561
2562bool
2565 std::string const& source)
2566{
2567 JLOG(m_journal.trace())
2568 << "recvValidation " << val->getLedgerHash() << " from " << source;
2569
2571 BypassAccept bypassAccept = BypassAccept::no;
2572 try
2573 {
2574 if (pendingValidations_.contains(val->getLedgerHash()))
2575 bypassAccept = BypassAccept::yes;
2576 else
2577 pendingValidations_.insert(val->getLedgerHash());
2578 scope_unlock unlock(lock);
2579 handleNewValidation(app_, val, source, bypassAccept, m_journal);
2580 }
2581 catch (std::exception const& e)
2582 {
2583 JLOG(m_journal.warn())
2584 << "Exception thrown for handling new validation "
2585 << val->getLedgerHash() << ": " << e.what();
2586 }
2587 catch (...)
2588 {
2589 JLOG(m_journal.warn())
2590 << "Unknown exception thrown for handling new validation "
2591 << val->getLedgerHash();
2592 }
2593 if (bypassAccept == BypassAccept::no)
2594 {
2595 pendingValidations_.erase(val->getLedgerHash());
2596 }
2597 lock.unlock();
2598
2599 pubValidation(val);
2600
2601 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2603 ss << "VALIDATION: " << val->render() << " master_key: ";
2604 auto master = app_.validators().getTrustedKey(val->getSignerPublic());
2605 if (master)
2606 {
2607 ss << toBase58(TokenType::NodePublic, *master);
2608 }
2609 else
2610 {
2611 ss << "none";
2612 }
2613 return ss.str();
2614 }();
2615
2616 // We will always relay trusted validations; if configured, we will
2617 // also relay all untrusted validations.
2618 return app_.config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2619}
2620
2623{
2624 return mConsensus.getJson(true);
2625}
2626
2628NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2629{
2631
2632 // System-level warnings
2633 {
2634 Json::Value warnings{Json::arrayValue};
2635 if (isAmendmentBlocked())
2636 {
2637 Json::Value& w = warnings.append(Json::objectValue);
2638 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2639 w[jss::message] =
2640 "This server is amendment blocked, and must be updated to be "
2641 "able to stay in sync with the network.";
2642 }
2643 if (isUNLBlocked())
2644 {
2645 Json::Value& w = warnings.append(Json::objectValue);
2646 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2647 w[jss::message] =
2648 "This server has an expired validator list. validators.txt "
2649 "may be incorrectly configured or some [validator_list_sites] "
2650 "may be unreachable.";
2651 }
2652 if (admin && isAmendmentWarned())
2653 {
2654 Json::Value& w = warnings.append(Json::objectValue);
2655 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2656 w[jss::message] =
2657 "One or more unsupported amendments have reached majority. "
2658 "Upgrade to the latest version before they are activated "
2659 "to avoid being amendment blocked.";
2660 if (auto const expected =
2662 {
2663 auto& d = w[jss::details] = Json::objectValue;
2664 d[jss::expected_date] = expected->time_since_epoch().count();
2665 d[jss::expected_date_UTC] = to_string(*expected);
2666 }
2667 }
2668
2669 if (warnings.size())
2670 info[jss::warnings] = std::move(warnings);
2671 }
2672
2673 // hostid: unique string describing the machine
2674 if (human)
2675 info[jss::hostid] = getHostId(admin);
2676
2677 // domain: if configured with a domain, report it:
2678 if (!app_.config().SERVER_DOMAIN.empty())
2679 info[jss::server_domain] = app_.config().SERVER_DOMAIN;
2680
2681 info[jss::build_version] = BuildInfo::getVersionString();
2682
2683 info[jss::server_state] = strOperatingMode(admin);
2684
2685 info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(
2687
2689 info[jss::network_ledger] = "waiting";
2690
2691 info[jss::validation_quorum] =
2692 static_cast<Json::UInt>(app_.validators().quorum());
2693
2694 if (admin)
2695 {
2696 switch (app_.config().NODE_SIZE)
2697 {
2698 case 0:
2699 info[jss::node_size] = "tiny";
2700 break;
2701 case 1:
2702 info[jss::node_size] = "small";
2703 break;
2704 case 2:
2705 info[jss::node_size] = "medium";
2706 break;
2707 case 3:
2708 info[jss::node_size] = "large";
2709 break;
2710 case 4:
2711 info[jss::node_size] = "huge";
2712 break;
2713 }
2714
2715 auto when = app_.validators().expires();
2716
2717 if (!human)
2718 {
2719 if (when)
2720 info[jss::validator_list_expires] =
2721 safe_cast<Json::UInt>(when->time_since_epoch().count());
2722 else
2723 info[jss::validator_list_expires] = 0;
2724 }
2725 else
2726 {
2727 auto& x = (info[jss::validator_list] = Json::objectValue);
2728
2729 x[jss::count] = static_cast<Json::UInt>(app_.validators().count());
2730
2731 if (when)
2732 {
2733 if (*when == TimeKeeper::time_point::max())
2734 {
2735 x[jss::expiration] = "never";
2736 x[jss::status] = "active";
2737 }
2738 else
2739 {
2740 x[jss::expiration] = to_string(*when);
2741
2742 if (*when > app_.timeKeeper().now())
2743 x[jss::status] = "active";
2744 else
2745 x[jss::status] = "expired";
2746 }
2747 }
2748 else
2749 {
2750 x[jss::status] = "unknown";
2751 x[jss::expiration] = "unknown";
2752 }
2753 }
2754
2755#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2756 {
2757 auto& x = (info[jss::git] = Json::objectValue);
2758#ifdef GIT_COMMIT_HASH
2759 x[jss::hash] = GIT_COMMIT_HASH;
2760#endif
2761#ifdef GIT_BRANCH
2762 x[jss::branch] = GIT_BRANCH;
2763#endif
2764 }
2765#endif
2766 }
2767 info[jss::io_latency_ms] =
2768 static_cast<Json::UInt>(app_.getIOLatency().count());
2769
2770 if (admin)
2771 {
2772 if (auto const localPubKey = app_.validators().localPublicKey();
2773 localPubKey && app_.getValidationPublicKey())
2774 {
2775 info[jss::pubkey_validator] =
2776 toBase58(TokenType::NodePublic, localPubKey.value());
2777 }
2778 else
2779 {
2780 info[jss::pubkey_validator] = "none";
2781 }
2782 }
2783
2784 if (counters)
2785 {
2786 info[jss::counters] = app_.getPerfLog().countersJson();
2787
2788 Json::Value nodestore(Json::objectValue);
2789 app_.getNodeStore().getCountsJson(nodestore);
2790 info[jss::counters][jss::nodestore] = nodestore;
2791 info[jss::current_activities] = app_.getPerfLog().currentJson();
2792 }
2793
2794 info[jss::pubkey_node] =
2796
2797 info[jss::complete_ledgers] = app_.getLedgerMaster().getCompleteLedgers();
2798
2800 info[jss::amendment_blocked] = true;
2801
2802 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2803
2804 if (fp != 0)
2805 info[jss::fetch_pack] = Json::UInt(fp);
2806
2807 info[jss::peers] = Json::UInt(app_.overlay().size());
2808
2809 Json::Value lastClose = Json::objectValue;
2810 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2811
2812 if (human)
2813 {
2814 lastClose[jss::converge_time_s] =
2816 }
2817 else
2818 {
2819 lastClose[jss::converge_time] =
2821 }
2822
2823 info[jss::last_close] = lastClose;
2824
2825 // info[jss::consensus] = mConsensus.getJson();
2826
2827 if (admin)
2828 info[jss::load] = m_job_queue.getJson();
2829
2830 if (auto const netid = app_.overlay().networkID())
2831 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2832
2833 auto const escalationMetrics =
2835
2836 auto const loadFactorServer = app_.getFeeTrack().getLoadFactor();
2837 auto const loadBaseServer = app_.getFeeTrack().getLoadBase();
2838 /* Scale the escalated fee level to unitless "load factor".
2839 In practice, this just strips the units, but it will continue
2840 to work correctly if either base value ever changes. */
2841 auto const loadFactorFeeEscalation =
2842 mulDiv(
2843 escalationMetrics.openLedgerFeeLevel,
2844 loadBaseServer,
2845 escalationMetrics.referenceFeeLevel)
2847
2848 auto const loadFactor = std::max(
2849 safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2850
2851 if (!human)
2852 {
2853 info[jss::load_base] = loadBaseServer;
2854 info[jss::load_factor] = trunc32(loadFactor);
2855 info[jss::load_factor_server] = loadFactorServer;
2856
2857 /* Json::Value doesn't support uint64, so clamp to max
2858 uint32 value. This is mostly theoretical, since there
2859 probably isn't enough extant XRP to drive the factor
2860 that high.
2861 */
2862 info[jss::load_factor_fee_escalation] =
2863 escalationMetrics.openLedgerFeeLevel.jsonClipped();
2864 info[jss::load_factor_fee_queue] =
2865 escalationMetrics.minProcessingFeeLevel.jsonClipped();
2866 info[jss::load_factor_fee_reference] =
2867 escalationMetrics.referenceFeeLevel.jsonClipped();
2868 }
2869 else
2870 {
2871 info[jss::load_factor] =
2872 static_cast<double>(loadFactor) / loadBaseServer;
2873
2874 if (loadFactorServer != loadFactor)
2875 info[jss::load_factor_server] =
2876 static_cast<double>(loadFactorServer) / loadBaseServer;
2877
2878 if (admin)
2879 {
2881 if (fee != loadBaseServer)
2882 info[jss::load_factor_local] =
2883 static_cast<double>(fee) / loadBaseServer;
2884 fee = app_.getFeeTrack().getRemoteFee();
2885 if (fee != loadBaseServer)
2886 info[jss::load_factor_net] =
2887 static_cast<double>(fee) / loadBaseServer;
2888 fee = app_.getFeeTrack().getClusterFee();
2889 if (fee != loadBaseServer)
2890 info[jss::load_factor_cluster] =
2891 static_cast<double>(fee) / loadBaseServer;
2892 }
2893 if (escalationMetrics.openLedgerFeeLevel !=
2894 escalationMetrics.referenceFeeLevel &&
2895 (admin || loadFactorFeeEscalation != loadFactor))
2896 info[jss::load_factor_fee_escalation] =
2897 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2898 escalationMetrics.referenceFeeLevel);
2899 if (escalationMetrics.minProcessingFeeLevel !=
2900 escalationMetrics.referenceFeeLevel)
2901 info[jss::load_factor_fee_queue] =
2902 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2903 escalationMetrics.referenceFeeLevel);
2904 }
2905
2906 bool valid = false;
2907 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2908
2909 if (lpClosed)
2910 valid = true;
2911 else
2912 lpClosed = m_ledgerMaster.getClosedLedger();
2913
2914 if (lpClosed)
2915 {
2916 XRPAmount const baseFee = lpClosed->fees().base;
2918 l[jss::seq] = Json::UInt(lpClosed->header().seq);
2919 l[jss::hash] = to_string(lpClosed->header().hash);
2920
2921 if (!human)
2922 {
2923 l[jss::base_fee] = baseFee.jsonClipped();
2924 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2925 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2926 l[jss::close_time] = Json::Value::UInt(
2927 lpClosed->header().closeTime.time_since_epoch().count());
2928 }
2929 else
2930 {
2931 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2932 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2933 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2934
2935 if (auto const closeOffset = app_.timeKeeper().closeOffset();
2936 std::abs(closeOffset.count()) >= 60)
2937 l[jss::close_time_offset] =
2938 static_cast<std::uint32_t>(closeOffset.count());
2939
2940 constexpr std::chrono::seconds highAgeThreshold{1000000};
2942 {
2943 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2944 l[jss::age] =
2945 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2946 }
2947 else
2948 {
2949 auto lCloseTime = lpClosed->header().closeTime;
2950 auto closeTime = app_.timeKeeper().closeTime();
2951 if (lCloseTime <= closeTime)
2952 {
2953 using namespace std::chrono_literals;
2954 auto age = closeTime - lCloseTime;
2955 l[jss::age] =
2956 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2957 }
2958 }
2959 }
2960
2961 if (valid)
2962 info[jss::validated_ledger] = l;
2963 else
2964 info[jss::closed_ledger] = l;
2965
2966 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2967 if (!lpPublished)
2968 info[jss::published_ledger] = "none";
2969 else if (lpPublished->header().seq != lpClosed->header().seq)
2970 info[jss::published_ledger] = lpPublished->header().seq;
2971 }
2972
2973 accounting_.json(info);
2974 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2975 info[jss::jq_trans_overflow] =
2977 info[jss::peer_disconnects] =
2979 info[jss::peer_disconnects_resources] =
2981
2982 // This array must be sorted in increasing order.
2983 static constexpr std::array<std::string_view, 7> protocols{
2984 "http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2985 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
2986 {
2988 for (auto const& port : app_.getServerHandler().setup().ports)
2989 {
2990 // Don't publish admin ports for non-admin users
2991 if (!admin &&
2992 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
2993 port.admin_user.empty() && port.admin_password.empty()))
2994 continue;
2997 std::begin(port.protocol),
2998 std::end(port.protocol),
2999 std::begin(protocols),
3000 std::end(protocols),
3001 std::back_inserter(proto));
3002 if (!proto.empty())
3003 {
3004 auto& jv = ports.append(Json::Value(Json::objectValue));
3005 jv[jss::port] = std::to_string(port.port);
3006 jv[jss::protocol] = Json::Value{Json::arrayValue};
3007 for (auto const& p : proto)
3008 jv[jss::protocol].append(p);
3009 }
3010 }
3011
3012 if (app_.config().exists(SECTION_PORT_GRPC))
3013 {
3014 auto const& grpcSection = app_.config().section(SECTION_PORT_GRPC);
3015 auto const optPort = grpcSection.get("port");
3016 if (optPort && grpcSection.get("ip"))
3017 {
3018 auto& jv = ports.append(Json::Value(Json::objectValue));
3019 jv[jss::port] = *optPort;
3020 jv[jss::protocol] = Json::Value{Json::arrayValue};
3021 jv[jss::protocol].append("grpc");
3022 }
3023 }
3024 info[jss::ports] = std::move(ports);
3025 }
3026
3027 return info;
3028}
3029
3030void
3035
3041
3042void
3044 std::shared_ptr<ReadView const> const& ledger,
3045 std::shared_ptr<STTx const> const& transaction,
3046 TER result)
3047{
3048 // never publish an inner txn inside a batch txn. The flag should
3049 // only be set if the Batch feature is enabled. If Batch is not
3050 // enabled, the flag is always invalid, so don't publish it
3051 // regardless.
3052 if (transaction->isFlag(tfInnerBatchTxn))
3053 return;
3054
3055 MultiApiJson jvObj =
3056 transJson(transaction, result, false, ledger, std::nullopt);
3057
3058 {
3060
3061 auto it = mStreamMaps[sRTTransactions].begin();
3062 while (it != mStreamMaps[sRTTransactions].end())
3063 {
3064 InfoSub::pointer p = it->second.lock();
3065
3066 if (p)
3067 {
3068 jvObj.visit(
3069 p->getApiVersion(), //
3070 [&](Json::Value const& jv) { p->send(jv, true); });
3071 ++it;
3072 }
3073 else
3074 {
3075 it = mStreamMaps[sRTTransactions].erase(it);
3076 }
3077 }
3078 }
3079
3080 pubProposedAccountTransaction(ledger, transaction, result);
3081}
3082
3083void
3085{
3086 // Ledgers are published only when they acquire sufficient validations
3087 // Holes are filled across connection loss or other catastrophe
3088
3090 app_.getAcceptedLedgerCache().fetch(lpAccepted->header().hash);
3091 if (!alpAccepted)
3092 {
3093 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted, app_);
3094 app_.getAcceptedLedgerCache().canonicalize_replace_client(
3095 lpAccepted->header().hash, alpAccepted);
3096 }
3097
3098 XRPL_ASSERT(
3099 alpAccepted->getLedger().get() == lpAccepted.get(),
3100 "xrpl::NetworkOPsImp::pubLedger : accepted input");
3101
3102 {
3103 JLOG(m_journal.debug())
3104 << "Publishing ledger " << lpAccepted->header().seq << " "
3105 << lpAccepted->header().hash;
3106
3108
3109 if (!mStreamMaps[sLedger].empty())
3110 {
3112
3113 jvObj[jss::type] = "ledgerClosed";
3114 jvObj[jss::ledger_index] = lpAccepted->header().seq;
3115 jvObj[jss::ledger_hash] = to_string(lpAccepted->header().hash);
3116 jvObj[jss::ledger_time] = Json::Value::UInt(
3117 lpAccepted->header().closeTime.time_since_epoch().count());
3118
3119 jvObj[jss::network_id] = app_.config().NETWORK_ID;
3120
3121 if (!lpAccepted->rules().enabled(featureXRPFees))
3122 jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
3123 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3124 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3125 jvObj[jss::reserve_inc] =
3126 lpAccepted->fees().increment.jsonClipped();
3127
3128 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
3129
3131 {
3132 jvObj[jss::validated_ledgers] =
3134 }
3135
3136 auto it = mStreamMaps[sLedger].begin();
3137 while (it != mStreamMaps[sLedger].end())
3138 {
3139 InfoSub::pointer p = it->second.lock();
3140 if (p)
3141 {
3142 p->send(jvObj, true);
3143 ++it;
3144 }
3145 else
3146 it = mStreamMaps[sLedger].erase(it);
3147 }
3148 }
3149
3150 if (!mStreamMaps[sBookChanges].empty())
3151 {
3152 Json::Value jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
3153
3154 auto it = mStreamMaps[sBookChanges].begin();
3155 while (it != mStreamMaps[sBookChanges].end())
3156 {
3157 InfoSub::pointer p = it->second.lock();
3158 if (p)
3159 {
3160 p->send(jvObj, true);
3161 ++it;
3162 }
3163 else
3164 it = mStreamMaps[sBookChanges].erase(it);
3165 }
3166 }
3167
3168 {
3169 static bool firstTime = true;
3170 if (firstTime)
3171 {
3172 // First validated ledger, start delayed SubAccountHistory
3173 firstTime = false;
3174 for (auto& outer : mSubAccountHistory)
3175 {
3176 for (auto& inner : outer.second)
3177 {
3178 auto& subInfo = inner.second;
3179 if (subInfo.index_->separationLedgerSeq_ == 0)
3180 {
3182 alpAccepted->getLedger(), subInfo);
3183 }
3184 }
3185 }
3186 }
3187 }
3188 }
3189
3190 // Don't lock since pubAcceptedTransaction is locking.
3191 for (auto const& accTx : *alpAccepted)
3192 {
3193 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
3195 lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3196 }
3197}
3198
3199void
3201{
3203 app_.openLedger().current()->fees().base,
3205 app_.getFeeTrack()};
3206
3207 // only schedule the job if something has changed
3208 if (f != mLastFeeSummary)
3209 {
3211 jtCLIENT_FEE_CHANGE, "reportFeeChange->pubServer", [this]() {
3212 pubServer();
3213 });
3214 }
3215}
3216
3217void
3219{
3222 "reportConsensusStateChange->pubConsensus",
3223 [this, phase]() { pubConsensus(phase); });
3224}
3225
3226inline void
3228{
3229 m_localTX->sweep(view);
3230}
3231inline std::size_t
3233{
3234 return m_localTX->size();
3235}
3236
3237// This routine should only be used to publish accepted or validated
3238// transactions.
3241 std::shared_ptr<STTx const> const& transaction,
3242 TER result,
3243 bool validated,
3244 std::shared_ptr<ReadView const> const& ledger,
3246{
3248 std::string sToken;
3249 std::string sHuman;
3250
3251 transResultInfo(result, sToken, sHuman);
3252
3253 jvObj[jss::type] = "transaction";
3254 // NOTE jvObj is not a finished object for either API version. After
3255 // it's populated, we need to finish it for a specific API version. This is
3256 // done in a loop, near the end of this function.
3257 jvObj[jss::transaction] =
3258 transaction->getJson(JsonOptions::disable_API_prior_V2, false);
3259
3260 if (meta)
3261 {
3262 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
3264 jvObj[jss::meta], *ledger, transaction, meta->get());
3265 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
3267 jvObj[jss::meta], transaction, meta->get());
3268 }
3269
3270 // add CTID where the needed data for it exists
3271 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
3272 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3273 {
3274 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3275 uint32_t netID = app_.config().NETWORK_ID;
3276 if (transaction->isFieldPresent(sfNetworkID))
3277 netID = transaction->getFieldU32(sfNetworkID);
3278
3280 RPC::encodeCTID(ledger->header().seq, txnSeq, netID);
3281 ctid)
3282 jvObj[jss::ctid] = *ctid;
3283 }
3284 if (!ledger->open())
3285 jvObj[jss::ledger_hash] = to_string(ledger->header().hash);
3286
3287 if (validated)
3288 {
3289 jvObj[jss::ledger_index] = ledger->header().seq;
3290 jvObj[jss::transaction][jss::date] =
3291 ledger->header().closeTime.time_since_epoch().count();
3292 jvObj[jss::validated] = true;
3293 jvObj[jss::close_time_iso] = to_string_iso(ledger->header().closeTime);
3294
3295 // WRITEME: Put the account next seq here
3296 }
3297 else
3298 {
3299 jvObj[jss::validated] = false;
3300 jvObj[jss::ledger_current_index] = ledger->header().seq;
3301 }
3302
3303 jvObj[jss::status] = validated ? "closed" : "proposed";
3304 jvObj[jss::engine_result] = sToken;
3305 jvObj[jss::engine_result_code] = result;
3306 jvObj[jss::engine_result_message] = sHuman;
3307
3308 if (transaction->getTxnType() == ttOFFER_CREATE)
3309 {
3310 auto const account = transaction->getAccountID(sfAccount);
3311 auto const amount = transaction->getFieldAmount(sfTakerGets);
3312
3313 // If the offer create is not self funded then add the owner balance
3314 if (account != amount.issue().account)
3315 {
3316 auto const ownerFunds = accountFunds(
3317 *ledger,
3318 account,
3319 amount,
3321 app_.journal("View"));
3322 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3323 }
3324 }
3325
3326 std::string const hash = to_string(transaction->getTransactionID());
3327 MultiApiJson multiObj{jvObj};
3329 multiObj.visit(), //
3330 [&]<unsigned Version>(
3332 RPC::insertDeliverMax(
3333 jvTx[jss::transaction], transaction->getTxnType(), Version);
3334
3335 if constexpr (Version > 1)
3336 {
3337 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3338 jvTx[jss::hash] = hash;
3339 }
3340 else
3341 {
3342 jvTx[jss::transaction][jss::hash] = hash;
3343 }
3344 });
3345
3346 return multiObj;
3347}
3348
3349void
3351 std::shared_ptr<ReadView const> const& ledger,
3352 AcceptedLedgerTx const& transaction,
3353 bool last)
3354{
3355 auto const& stTxn = transaction.getTxn();
3356
3357 // Create two different Json objects, for different API versions
3358 auto const metaRef = std::ref(transaction.getMeta());
3359 auto const trResult = transaction.getResult();
3360 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3361
3362 {
3364
3365 auto it = mStreamMaps[sTransactions].begin();
3366 while (it != mStreamMaps[sTransactions].end())
3367 {
3368 InfoSub::pointer p = it->second.lock();
3369
3370 if (p)
3371 {
3372 jvObj.visit(
3373 p->getApiVersion(), //
3374 [&](Json::Value const& jv) { p->send(jv, true); });
3375 ++it;
3376 }
3377 else
3378 it = mStreamMaps[sTransactions].erase(it);
3379 }
3380
3381 it = mStreamMaps[sRTTransactions].begin();
3382
3383 while (it != mStreamMaps[sRTTransactions].end())
3384 {
3385 InfoSub::pointer p = it->second.lock();
3386
3387 if (p)
3388 {
3389 jvObj.visit(
3390 p->getApiVersion(), //
3391 [&](Json::Value const& jv) { p->send(jv, true); });
3392 ++it;
3393 }
3394 else
3395 it = mStreamMaps[sRTTransactions].erase(it);
3396 }
3397 }
3398
3399 if (transaction.getResult() == tesSUCCESS)
3400 app_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
3401
3402 pubAccountTransaction(ledger, transaction, last);
3403}
3404
3405void
3407 std::shared_ptr<ReadView const> const& ledger,
3408 AcceptedLedgerTx const& transaction,
3409 bool last)
3410{
3412 int iProposed = 0;
3413 int iAccepted = 0;
3414
3415 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3416 auto const currLedgerSeq = ledger->seq();
3417 {
3419
3420 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3422 {
3423 for (auto const& affectedAccount : transaction.getAffected())
3424 {
3425 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3426 simiIt != mSubRTAccount.end())
3427 {
3428 auto it = simiIt->second.begin();
3429
3430 while (it != simiIt->second.end())
3431 {
3432 InfoSub::pointer p = it->second.lock();
3433
3434 if (p)
3435 {
3436 notify.insert(p);
3437 ++it;
3438 ++iProposed;
3439 }
3440 else
3441 it = simiIt->second.erase(it);
3442 }
3443 }
3444
3445 if (auto simiIt = mSubAccount.find(affectedAccount);
3446 simiIt != mSubAccount.end())
3447 {
3448 auto it = simiIt->second.begin();
3449 while (it != simiIt->second.end())
3450 {
3451 InfoSub::pointer p = it->second.lock();
3452
3453 if (p)
3454 {
3455 notify.insert(p);
3456 ++it;
3457 ++iAccepted;
3458 }
3459 else
3460 it = simiIt->second.erase(it);
3461 }
3462 }
3463
3464 if (auto historyIt = mSubAccountHistory.find(affectedAccount);
3465 historyIt != mSubAccountHistory.end())
3466 {
3467 auto& subs = historyIt->second;
3468 auto it = subs.begin();
3469 while (it != subs.end())
3470 {
3471 SubAccountHistoryInfoWeak const& info = it->second;
3472 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3473 {
3474 ++it;
3475 continue;
3476 }
3477
3478 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3479 {
3480 accountHistoryNotify.emplace_back(
3481 SubAccountHistoryInfo{isSptr, info.index_});
3482 ++it;
3483 }
3484 else
3485 {
3486 it = subs.erase(it);
3487 }
3488 }
3489 if (subs.empty())
3490 mSubAccountHistory.erase(historyIt);
3491 }
3492 }
3493 }
3494 }
3495
3496 JLOG(m_journal.trace())
3497 << "pubAccountTransaction: "
3498 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3499
3500 if (!notify.empty() || !accountHistoryNotify.empty())
3501 {
3502 auto const& stTxn = transaction.getTxn();
3503
3504 // Create two different Json objects, for different API versions
3505 auto const metaRef = std::ref(transaction.getMeta());
3506 auto const trResult = transaction.getResult();
3507 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3508
3509 for (InfoSub::ref isrListener : notify)
3510 {
3511 jvObj.visit(
3512 isrListener->getApiVersion(), //
3513 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3514 }
3515
3516 if (last)
3517 jvObj.set(jss::account_history_boundary, true);
3518
3519 XRPL_ASSERT(
3520 jvObj.isMember(jss::account_history_tx_stream) ==
3522 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3523 "account_history_tx_stream not set");
3524 for (auto& info : accountHistoryNotify)
3525 {
3526 auto& index = info.index_;
3527 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3528 jvObj.set(jss::account_history_tx_first, true);
3529
3530 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3531
3532 jvObj.visit(
3533 info.sink_->getApiVersion(), //
3534 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3535 }
3536 }
3537}
3538
3539void
3541 std::shared_ptr<ReadView const> const& ledger,
3543 TER result)
3544{
3546 int iProposed = 0;
3547
3548 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3549
3550 {
3552
3553 if (mSubRTAccount.empty())
3554 return;
3555
3556 if (!mSubAccount.empty() || !mSubRTAccount.empty() ||
3558 {
3559 for (auto const& affectedAccount : tx->getMentionedAccounts())
3560 {
3561 if (auto simiIt = mSubRTAccount.find(affectedAccount);
3562 simiIt != mSubRTAccount.end())
3563 {
3564 auto it = simiIt->second.begin();
3565
3566 while (it != simiIt->second.end())
3567 {
3568 InfoSub::pointer p = it->second.lock();
3569
3570 if (p)
3571 {
3572 notify.insert(p);
3573 ++it;
3574 ++iProposed;
3575 }
3576 else
3577 it = simiIt->second.erase(it);
3578 }
3579 }
3580 }
3581 }
3582 }
3583
3584 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3585
3586 if (!notify.empty() || !accountHistoryNotify.empty())
3587 {
3588 // Create two different Json objects, for different API versions
3589 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3590
3591 for (InfoSub::ref isrListener : notify)
3592 jvObj.visit(
3593 isrListener->getApiVersion(), //
3594 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3595
3596 XRPL_ASSERT(
3597 jvObj.isMember(jss::account_history_tx_stream) ==
3599 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3600 "account_history_tx_stream not set");
3601 for (auto& info : accountHistoryNotify)
3602 {
3603 auto& index = info.index_;
3604 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3605 jvObj.set(jss::account_history_tx_first, true);
3606 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3607 jvObj.visit(
3608 info.sink_->getApiVersion(), //
3609 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3610 }
3611 }
3612}
3613
3614//
3615// Monitoring
3616//
3617
3618void
3620 InfoSub::ref isrListener,
3621 hash_set<AccountID> const& vnaAccountIDs,
3622 bool rt)
3623{
3624 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3625
3626 for (auto const& naAccountID : vnaAccountIDs)
3627 {
3628 JLOG(m_journal.trace())
3629 << "subAccount: account: " << toBase58(naAccountID);
3630
3631 isrListener->insertSubAccountInfo(naAccountID, rt);
3632 }
3633
3635
3636 for (auto const& naAccountID : vnaAccountIDs)
3637 {
3638 auto simIterator = subMap.find(naAccountID);
3639 if (simIterator == subMap.end())
3640 {
3641 // Not found, note that account has a new single listener.
3642 SubMapType usisElement;
3643 usisElement[isrListener->getSeq()] = isrListener;
3644 // VFALCO NOTE This is making a needless copy of naAccountID
3645 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3646 }
3647 else
3648 {
3649 // Found, note that the account has another listener.
3650 simIterator->second[isrListener->getSeq()] = isrListener;
3651 }
3652 }
3653}
3654
3655void
3657 InfoSub::ref isrListener,
3658 hash_set<AccountID> const& vnaAccountIDs,
3659 bool rt)
3660{
3661 for (auto const& naAccountID : vnaAccountIDs)
3662 {
3663 // Remove from the InfoSub
3664 isrListener->deleteSubAccountInfo(naAccountID, rt);
3665 }
3666
3667 // Remove from the server
3668 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3669}
3670
3671void
3673 std::uint64_t uSeq,
3674 hash_set<AccountID> const& vnaAccountIDs,
3675 bool rt)
3676{
3678
3679 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3680
3681 for (auto const& naAccountID : vnaAccountIDs)
3682 {
3683 auto simIterator = subMap.find(naAccountID);
3684
3685 if (simIterator != subMap.end())
3686 {
3687 // Found
3688 simIterator->second.erase(uSeq);
3689
3690 if (simIterator->second.empty())
3691 {
3692 // Don't need hash entry.
3693 subMap.erase(simIterator);
3694 }
3695 }
3696 }
3697}
3698
3699void
3701{
3702 enum DatabaseType { Sqlite, None };
3703 static auto const databaseType = [&]() -> DatabaseType {
3704 // Use a dynamic_cast to return DatabaseType::None
3705 // on failure.
3706 if (dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
3707 {
3708 return DatabaseType::Sqlite;
3709 }
3710 return DatabaseType::None;
3711 }();
3712
3713 if (databaseType == DatabaseType::None)
3714 {
3715 // LCOV_EXCL_START
3716 UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
3717 JLOG(m_journal.error())
3718 << "AccountHistory job for account "
3719 << toBase58(subInfo.index_->accountId_) << " no database";
3720 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3721 {
3722 sptr->send(rpcError(rpcINTERNAL), true);
3723 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3724 }
3725 return;
3726 // LCOV_EXCL_STOP
3727 }
3728
3731 "AccountHistoryTxStream",
3732 [this, dbType = databaseType, subInfo]() {
3733 auto const& accountId = subInfo.index_->accountId_;
3734 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3735 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3736
3737 JLOG(m_journal.trace())
3738 << "AccountHistory job for account " << toBase58(accountId)
3739 << " started. lastLedgerSeq=" << lastLedgerSeq;
3740
3741 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
3742 std::shared_ptr<TxMeta> const& meta) -> bool {
3743 /*
3744 * genesis account: first tx is the one with seq 1
3745 * other account: first tx is the one created the account
3746 */
3747 if (accountId == genesisAccountId)
3748 {
3749 auto stx = tx->getSTransaction();
3750 if (stx->getAccountID(sfAccount) == accountId &&
3751 stx->getSeqValue() == 1)
3752 return true;
3753 }
3754
3755 for (auto& node : meta->getNodes())
3756 {
3757 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3758 continue;
3759
3760 if (node.isFieldPresent(sfNewFields))
3761 {
3762 if (auto inner = dynamic_cast<STObject const*>(
3763 node.peekAtPField(sfNewFields));
3764 inner)
3765 {
3766 if (inner->isFieldPresent(sfAccount) &&
3767 inner->getAccountID(sfAccount) == accountId)
3768 {
3769 return true;
3770 }
3771 }
3772 }
3773 }
3774
3775 return false;
3776 };
3777
3778 auto send = [&](Json::Value const& jvObj,
3779 bool unsubscribe) -> bool {
3780 if (auto sptr = subInfo.sinkWptr_.lock())
3781 {
3782 sptr->send(jvObj, true);
3783 if (unsubscribe)
3784 unsubAccountHistory(sptr, accountId, false);
3785 return true;
3786 }
3787
3788 return false;
3789 };
3790
3791 auto sendMultiApiJson = [&](MultiApiJson const& jvObj,
3792 bool unsubscribe) -> bool {
3793 if (auto sptr = subInfo.sinkWptr_.lock())
3794 {
3795 jvObj.visit(
3796 sptr->getApiVersion(), //
3797 [&](Json::Value const& jv) { sptr->send(jv, true); });
3798
3799 if (unsubscribe)
3800 unsubAccountHistory(sptr, accountId, false);
3801 return true;
3802 }
3803
3804 return false;
3805 };
3806
3807 auto getMoreTxns =
3808 [&](std::uint32_t minLedger,
3809 std::uint32_t maxLedger,
3814 switch (dbType)
3815 {
3816 case Sqlite: {
3817 auto db = static_cast<SQLiteDatabase*>(
3820 accountId, minLedger, maxLedger, marker, 0, true};
3821 return db->newestAccountTxPage(options);
3822 }
3823 // LCOV_EXCL_START
3824 default: {
3825 UNREACHABLE(
3826 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3827 "getMoreTxns : invalid database type");
3828 return {};
3829 }
3830 // LCOV_EXCL_STOP
3831 }
3832 };
3833
3834 /*
3835 * search backward until the genesis ledger or asked to stop
3836 */
3837 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3838 {
3839 int feeChargeCount = 0;
3840 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3841 {
3842 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3843 ++feeChargeCount;
3844 }
3845 else
3846 {
3847 JLOG(m_journal.trace())
3848 << "AccountHistory job for account "
3849 << toBase58(accountId) << " no InfoSub. Fee charged "
3850 << feeChargeCount << " times.";
3851 return;
3852 }
3853
3854 // try to search in 1024 ledgers till reaching genesis ledgers
3855 auto startLedgerSeq =
3856 (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3857 JLOG(m_journal.trace())
3858 << "AccountHistory job for account " << toBase58(accountId)
3859 << ", working on ledger range [" << startLedgerSeq << ","
3860 << lastLedgerSeq << "]";
3861
3862 auto haveRange = [&]() -> bool {
3863 std::uint32_t validatedMin = UINT_MAX;
3864 std::uint32_t validatedMax = 0;
3865 auto haveSomeValidatedLedgers =
3867 validatedMin, validatedMax);
3868
3869 return haveSomeValidatedLedgers &&
3870 validatedMin <= startLedgerSeq &&
3871 lastLedgerSeq <= validatedMax;
3872 }();
3873
3874 if (!haveRange)
3875 {
3876 JLOG(m_journal.debug())
3877 << "AccountHistory reschedule job for account "
3878 << toBase58(accountId) << ", incomplete ledger range ["
3879 << startLedgerSeq << "," << lastLedgerSeq << "]";
3881 return;
3882 }
3883
3885 while (!subInfo.index_->stopHistorical_)
3886 {
3887 auto dbResult =
3888 getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3889 if (!dbResult)
3890 {
3891 // LCOV_EXCL_START
3892 UNREACHABLE(
3893 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3894 "getMoreTxns failed");
3895 JLOG(m_journal.debug())
3896 << "AccountHistory job for account "
3897 << toBase58(accountId) << " getMoreTxns failed.";
3898 send(rpcError(rpcINTERNAL), true);
3899 return;
3900 // LCOV_EXCL_STOP
3901 }
3902
3903 auto const& txns = dbResult->first;
3904 marker = dbResult->second;
3905 size_t num_txns = txns.size();
3906 for (size_t i = 0; i < num_txns; ++i)
3907 {
3908 auto const& [tx, meta] = txns[i];
3909
3910 if (!tx || !meta)
3911 {
3912 JLOG(m_journal.debug())
3913 << "AccountHistory job for account "
3914 << toBase58(accountId) << " empty tx or meta.";
3915 send(rpcError(rpcINTERNAL), true);
3916 return;
3917 }
3918 auto curTxLedger =
3920 tx->getLedger());
3921 if (!curTxLedger)
3922 {
3923 // LCOV_EXCL_START
3924 UNREACHABLE(
3925 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3926 "getLedgerBySeq failed");
3927 JLOG(m_journal.debug())
3928 << "AccountHistory job for account "
3929 << toBase58(accountId) << " no ledger.";
3930 send(rpcError(rpcINTERNAL), true);
3931 return;
3932 // LCOV_EXCL_STOP
3933 }
3935 tx->getSTransaction();
3936 if (!stTxn)
3937 {
3938 // LCOV_EXCL_START
3939 UNREACHABLE(
3940 "NetworkOPsImp::addAccountHistoryJob : "
3941 "getSTransaction failed");
3942 JLOG(m_journal.debug())
3943 << "AccountHistory job for account "
3944 << toBase58(accountId)
3945 << " getSTransaction failed.";
3946 send(rpcError(rpcINTERNAL), true);
3947 return;
3948 // LCOV_EXCL_STOP
3949 }
3950
3951 auto const mRef = std::ref(*meta);
3952 auto const trR = meta->getResultTER();
3953 MultiApiJson jvTx =
3954 transJson(stTxn, trR, true, curTxLedger, mRef);
3955
3956 jvTx.set(
3957 jss::account_history_tx_index, txHistoryIndex--);
3958 if (i + 1 == num_txns ||
3959 txns[i + 1].first->getLedger() != tx->getLedger())
3960 jvTx.set(jss::account_history_boundary, true);
3961
3962 if (isFirstTx(tx, meta))
3963 {
3964 jvTx.set(jss::account_history_tx_first, true);
3965 sendMultiApiJson(jvTx, false);
3966
3967 JLOG(m_journal.trace())
3968 << "AccountHistory job for account "
3969 << toBase58(accountId)
3970 << " done, found last tx.";
3971 return;
3972 }
3973 else
3974 {
3975 sendMultiApiJson(jvTx, false);
3976 }
3977 }
3978
3979 if (marker)
3980 {
3981 JLOG(m_journal.trace())
3982 << "AccountHistory job for account "
3983 << toBase58(accountId)
3984 << " paging, marker=" << marker->ledgerSeq << ":"
3985 << marker->txnSeq;
3986 }
3987 else
3988 {
3989 break;
3990 }
3991 }
3992
3993 if (!subInfo.index_->stopHistorical_)
3994 {
3995 lastLedgerSeq = startLedgerSeq - 1;
3996 if (lastLedgerSeq <= 1)
3997 {
3998 JLOG(m_journal.trace())
3999 << "AccountHistory job for account "
4000 << toBase58(accountId)
4001 << " done, reached genesis ledger.";
4002 return;
4003 }
4004 }
4005 }
4006 });
4007}
4008
4009void
4011 std::shared_ptr<ReadView const> const& ledger,
4013{
4014 subInfo.index_->separationLedgerSeq_ = ledger->seq();
4015 auto const& accountId = subInfo.index_->accountId_;
4016 auto const accountKeylet = keylet::account(accountId);
4017 if (!ledger->exists(accountKeylet))
4018 {
4019 JLOG(m_journal.debug())
4020 << "subAccountHistoryStart, no account " << toBase58(accountId)
4021 << ", no need to add AccountHistory job.";
4022 return;
4023 }
4024 if (accountId == genesisAccountId)
4025 {
4026 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4027 {
4028 if (sleAcct->getFieldU32(sfSequence) == 1)
4029 {
4030 JLOG(m_journal.debug())
4031 << "subAccountHistoryStart, genesis account "
4032 << toBase58(accountId)
4033 << " does not have tx, no need to add AccountHistory job.";
4034 return;
4035 }
4036 }
4037 else
4038 {
4039 // LCOV_EXCL_START
4040 UNREACHABLE(
4041 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
4042 "access genesis account");
4043 return;
4044 // LCOV_EXCL_STOP
4045 }
4046 }
4047 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
4048 subInfo.index_->haveHistorical_ = true;
4049
4050 JLOG(m_journal.debug())
4051 << "subAccountHistoryStart, add AccountHistory job: accountId="
4052 << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
4053
4054 addAccountHistoryJob(subInfo);
4055}
4056
4059 InfoSub::ref isrListener,
4060 AccountID const& accountId)
4061{
4062 if (!isrListener->insertSubAccountHistory(accountId))
4063 {
4064 JLOG(m_journal.debug())
4065 << "subAccountHistory, already subscribed to account "
4066 << toBase58(accountId);
4067 return rpcINVALID_PARAMS;
4068 }
4069
4072 isrListener, std::make_shared<SubAccountHistoryIndex>(accountId)};
4073 auto simIterator = mSubAccountHistory.find(accountId);
4074 if (simIterator == mSubAccountHistory.end())
4075 {
4077 inner.emplace(isrListener->getSeq(), ahi);
4079 simIterator, std::make_pair(accountId, inner));
4080 }
4081 else
4082 {
4083 simIterator->second.emplace(isrListener->getSeq(), ahi);
4084 }
4085
4086 auto const ledger = app_.getLedgerMaster().getValidatedLedger();
4087 if (ledger)
4088 {
4089 subAccountHistoryStart(ledger, ahi);
4090 }
4091 else
4092 {
4093 // The node does not have validated ledgers, so wait for
4094 // one before start streaming.
4095 // In this case, the subscription is also considered successful.
4096 JLOG(m_journal.debug())
4097 << "subAccountHistory, no validated ledger yet, delay start";
4098 }
4099
4100 return rpcSUCCESS;
4101}
4102
4103void
4105 InfoSub::ref isrListener,
4106 AccountID const& account,
4107 bool historyOnly)
4108{
4109 if (!historyOnly)
4110 isrListener->deleteSubAccountHistory(account);
4111 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
4112}
4113
4114void
4116 std::uint64_t seq,
4117 AccountID const& account,
4118 bool historyOnly)
4119{
4121 auto simIterator = mSubAccountHistory.find(account);
4122 if (simIterator != mSubAccountHistory.end())
4123 {
4124 auto& subInfoMap = simIterator->second;
4125 auto subInfoIter = subInfoMap.find(seq);
4126 if (subInfoIter != subInfoMap.end())
4127 {
4128 subInfoIter->second.index_->stopHistorical_ = true;
4129 }
4130
4131 if (!historyOnly)
4132 {
4133 simIterator->second.erase(seq);
4134 if (simIterator->second.empty())
4135 {
4136 mSubAccountHistory.erase(simIterator);
4137 }
4138 }
4139 JLOG(m_journal.debug())
4140 << "unsubAccountHistory, account " << toBase58(account)
4141 << ", historyOnly = " << (historyOnly ? "true" : "false");
4142 }
4143}
4144
4145bool
4147{
4148 if (auto listeners = app_.getOrderBookDB().makeBookListeners(book))
4149 listeners->addSubscriber(isrListener);
4150 else
4151 {
4152 // LCOV_EXCL_START
4153 UNREACHABLE("xrpl::NetworkOPsImp::subBook : null book listeners");
4154 // LCOV_EXCL_STOP
4155 }
4156 return true;
4157}
4158
4159bool
4161{
4162 if (auto listeners = app_.getOrderBookDB().getBookListeners(book))
4163 listeners->removeSubscriber(uSeq);
4164
4165 return true;
4166}
4167
4171{
4172 // This code-path is exclusively used when the server is in standalone
4173 // mode via `ledger_accept`
4174 XRPL_ASSERT(
4175 m_standalone, "xrpl::NetworkOPsImp::acceptLedger : is standalone");
4176
4177 if (!m_standalone)
4178 Throw<std::runtime_error>(
4179 "Operation only possible in STANDALONE mode.");
4180
4181 // FIXME Could we improve on this and remove the need for a specialized
4182 // API in Consensus?
4183 beginConsensus(m_ledgerMaster.getClosedLedger()->header().hash, {});
4184 mConsensus.simulate(app_.timeKeeper().closeTime(), consensusDelay);
4185 return m_ledgerMaster.getCurrentLedger()->header().seq;
4186}
4187
4188// <-- bool: true=added, false=already there
4189bool
4191{
4192 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
4193 {
4194 jvResult[jss::ledger_index] = lpClosed->header().seq;
4195 jvResult[jss::ledger_hash] = to_string(lpClosed->header().hash);
4196 jvResult[jss::ledger_time] = Json::Value::UInt(
4197 lpClosed->header().closeTime.time_since_epoch().count());
4198 if (!lpClosed->rules().enabled(featureXRPFees))
4199 jvResult[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
4200 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4201 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4202 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4203 jvResult[jss::network_id] = app_.config().NETWORK_ID;
4204 }
4205
4207 {
4208 jvResult[jss::validated_ledgers] =
4210 }
4211
4213 return mStreamMaps[sLedger]
4214 .emplace(isrListener->getSeq(), isrListener)
4215 .second;
4216}
4217
4218// <-- bool: true=added, false=already there
4219bool
4221{
4224 .emplace(isrListener->getSeq(), isrListener)
4225 .second;
4226}
4227
4228// <-- bool: true=erased, false=was not there
4229bool
4231{
4233 return mStreamMaps[sLedger].erase(uSeq);
4234}
4235
4236// <-- bool: true=erased, false=was not there
4237bool
4243
4244// <-- bool: true=added, false=already there
4245bool
4247{
4249 return mStreamMaps[sManifests]
4250 .emplace(isrListener->getSeq(), isrListener)
4251 .second;
4252}
4253
4254// <-- bool: true=erased, false=was not there
4255bool
4261
4262// <-- bool: true=added, false=already there
4263bool
4265 InfoSub::ref isrListener,
4266 Json::Value& jvResult,
4267 bool admin)
4268{
4269 uint256 uRandom;
4270
4271 if (m_standalone)
4272 jvResult[jss::stand_alone] = m_standalone;
4273
4274 // CHECKME: is it necessary to provide a random number here?
4275 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
4276
4277 auto const& feeTrack = app_.getFeeTrack();
4278 jvResult[jss::random] = to_string(uRandom);
4279 jvResult[jss::server_status] = strOperatingMode(admin);
4280 jvResult[jss::load_base] = feeTrack.getLoadBase();
4281 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4282 jvResult[jss::hostid] = getHostId(admin);
4283 jvResult[jss::pubkey_node] =
4285
4287 return mStreamMaps[sServer]
4288 .emplace(isrListener->getSeq(), isrListener)
4289 .second;
4290}
4291
4292// <-- bool: true=erased, false=was not there
4293bool
4295{
4297 return mStreamMaps[sServer].erase(uSeq);
4298}
4299
4300// <-- bool: true=added, false=already there
4301bool
4303{
4306 .emplace(isrListener->getSeq(), isrListener)
4307 .second;
4308}
4309
4310// <-- bool: true=erased, false=was not there
4311bool
4317
4318// <-- bool: true=added, false=already there
4319bool
4321{
4324 .emplace(isrListener->getSeq(), isrListener)
4325 .second;
4326}
4327
4328// <-- bool: true=erased, false=was not there
4329bool
4335
4336// <-- bool: true=added, false=already there
4337bool
4339{
4342 .emplace(isrListener->getSeq(), isrListener)
4343 .second;
4344}
4345
4346void
4351
4352// <-- bool: true=erased, false=was not there
4353bool
4359
4360// <-- bool: true=added, false=already there
4361bool
4363{
4365 return mStreamMaps[sPeerStatus]
4366 .emplace(isrListener->getSeq(), isrListener)
4367 .second;
4368}
4369
4370// <-- bool: true=erased, false=was not there
4371bool
4377
4378// <-- bool: true=added, false=already there
4379bool
4381{
4384 .emplace(isrListener->getSeq(), isrListener)
4385 .second;
4386}
4387
4388// <-- bool: true=erased, false=was not there
4389bool
4395
4398{
4400
4401 subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
4402
4403 if (it != mRpcSubMap.end())
4404 return it->second;
4405
4406 return InfoSub::pointer();
4407}
4408
4411{
4413
4414 mRpcSubMap.emplace(strUrl, rspEntry);
4415
4416 return rspEntry;
4417}
4418
4419bool
4421{
4423 auto pInfo = findRpcSub(strUrl);
4424
4425 if (!pInfo)
4426 return false;
4427
4428 // check to see if any of the stream maps still hold a weak reference to
4429 // this entry before removing
4430 for (SubMapType const& map : mStreamMaps)
4431 {
4432 if (map.find(pInfo->getSeq()) != map.end())
4433 return false;
4434 }
4435 mRpcSubMap.erase(strUrl);
4436 return true;
4437}
4438
4439#ifndef USE_NEW_BOOK_PAGE
4440
4441// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4442// work, but it demonstrated poor performance.
4443//
4444void
4447 Book const& book,
4448 AccountID const& uTakerID,
4449 bool const bProof,
4450 unsigned int iLimit,
4451 Json::Value const& jvMarker,
4452 Json::Value& jvResult)
4453{ // CAUTION: This is the old get book page logic
4454 Json::Value& jvOffers =
4455 (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4456
4458 uint256 const uBookBase = getBookBase(book);
4459 uint256 const uBookEnd = getQualityNext(uBookBase);
4460 uint256 uTipIndex = uBookBase;
4461
4462 if (auto stream = m_journal.trace())
4463 {
4464 stream << "getBookPage:" << book;
4465 stream << "getBookPage: uBookBase=" << uBookBase;
4466 stream << "getBookPage: uBookEnd=" << uBookEnd;
4467 stream << "getBookPage: uTipIndex=" << uTipIndex;
4468 }
4469
4470 ReadView const& view = *lpLedger;
4471
4472 bool const bGlobalFreeze = isGlobalFrozen(view, book.out.account) ||
4473 isGlobalFrozen(view, book.in.account);
4474
4475 bool bDone = false;
4476 bool bDirectAdvance = true;
4477
4478 std::shared_ptr<SLE const> sleOfferDir;
4479 uint256 offerIndex;
4480 unsigned int uBookEntry;
4481 STAmount saDirRate;
4482
4483 auto const rate = transferRate(view, book.out.account);
4484 auto viewJ = app_.journal("View");
4485
4486 while (!bDone && iLimit-- > 0)
4487 {
4488 if (bDirectAdvance)
4489 {
4490 bDirectAdvance = false;
4491
4492 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4493
4494 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4495 if (ledgerIndex)
4496 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4497 else
4498 sleOfferDir.reset();
4499
4500 if (!sleOfferDir)
4501 {
4502 JLOG(m_journal.trace()) << "getBookPage: bDone";
4503 bDone = true;
4504 }
4505 else
4506 {
4507 uTipIndex = sleOfferDir->key();
4508 saDirRate = amountFromQuality(getQuality(uTipIndex));
4509
4510 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4511
4512 JLOG(m_journal.trace())
4513 << "getBookPage: uTipIndex=" << uTipIndex;
4514 JLOG(m_journal.trace())
4515 << "getBookPage: offerIndex=" << offerIndex;
4516 }
4517 }
4518
4519 if (!bDone)
4520 {
4521 auto sleOffer = view.read(keylet::offer(offerIndex));
4522
4523 if (sleOffer)
4524 {
4525 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4526 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4527 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4528 STAmount saOwnerFunds;
4529 bool firstOwnerOffer(true);
4530
4531 if (book.out.account == uOfferOwnerID)
4532 {
4533 // If an offer is selling issuer's own IOUs, it is fully
4534 // funded.
4535 saOwnerFunds = saTakerGets;
4536 }
4537 else if (bGlobalFreeze)
4538 {
4539 // If either asset is globally frozen, consider all offers
4540 // that aren't ours to be totally unfunded
4541 saOwnerFunds.clear(book.out);
4542 }
4543 else
4544 {
4545 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4546 if (umBalanceEntry != umBalance.end())
4547 {
4548 // Found in running balance table.
4549
4550 saOwnerFunds = umBalanceEntry->second;
4551 firstOwnerOffer = false;
4552 }
4553 else
4554 {
4555 // Did not find balance in table.
4556
4557 saOwnerFunds = accountHolds(
4558 view,
4559 uOfferOwnerID,
4560 book.out.currency,
4561 book.out.account,
4563 viewJ);
4564
4565 if (saOwnerFunds < beast::zero)
4566 {
4567 // Treat negative funds as zero.
4568
4569 saOwnerFunds.clear();
4570 }
4571 }
4572 }
4573
4574 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4575
4576 STAmount saTakerGetsFunded;
4577 STAmount saOwnerFundsLimit = saOwnerFunds;
4578 Rate offerRate = parityRate;
4579
4580 if (rate != parityRate
4581 // Have a transfer fee.
4582 && uTakerID != book.out.account
4583 // Not taking offers of own IOUs.
4584 && book.out.account != uOfferOwnerID)
4585 // Offer owner not issuing ownfunds
4586 {
4587 // Need to charge a transfer fee to offer owner.
4588 offerRate = rate;
4589 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4590 }
4591
4592 if (saOwnerFundsLimit >= saTakerGets)
4593 {
4594 // Sufficient funds no shenanigans.
4595 saTakerGetsFunded = saTakerGets;
4596 }
4597 else
4598 {
4599 // Only provide, if not fully funded.
4600
4601 saTakerGetsFunded = saOwnerFundsLimit;
4602
4603 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4604 std::min(
4605 saTakerPays,
4606 multiply(
4607 saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4608 .setJson(jvOffer[jss::taker_pays_funded]);
4609 }
4610
4611 STAmount saOwnerPays = (parityRate == offerRate)
4612 ? saTakerGetsFunded
4613 : std::min(
4614 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4615
4616 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4617
4618 // Include all offers funded and unfunded
4619 Json::Value& jvOf = jvOffers.append(jvOffer);
4620 jvOf[jss::quality] = saDirRate.getText();
4621
4622 if (firstOwnerOffer)
4623 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4624 }
4625 else
4626 {
4627 JLOG(m_journal.warn()) << "Missing offer";
4628 }
4629
4630 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4631 {
4632 bDirectAdvance = true;
4633 }
4634 else
4635 {
4636 JLOG(m_journal.trace())
4637 << "getBookPage: offerIndex=" << offerIndex;
4638 }
4639 }
4640 }
4641
4642 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4643 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4644}
4645
4646#else
4647
4648// This is the new code that uses the book iterators
4649// It has temporarily been disabled
4650
4651void
4654 Book const& book,
4655 AccountID const& uTakerID,
4656 bool const bProof,
4657 unsigned int iLimit,
4658 Json::Value const& jvMarker,
4659 Json::Value& jvResult)
4660{
4661 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4662
4664
4665 MetaView lesActive(lpLedger, tapNONE, true);
4666 OrderBookIterator obIterator(lesActive, book);
4667
4668 auto const rate = transferRate(lesActive, book.out.account);
4669
4670 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.out.account) ||
4671 lesActive.isGlobalFrozen(book.in.account);
4672
4673 while (iLimit-- > 0 && obIterator.nextOffer())
4674 {
4675 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4676 if (sleOffer)
4677 {
4678 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4679 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4680 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4681 STAmount saDirRate = obIterator.getCurrentRate();
4682 STAmount saOwnerFunds;
4683
4684 if (book.out.account == uOfferOwnerID)
4685 {
4686 // If offer is selling issuer's own IOUs, it is fully funded.
4687 saOwnerFunds = saTakerGets;
4688 }
4689 else if (bGlobalFreeze)
4690 {
4691 // If either asset is globally frozen, consider all offers
4692 // that aren't ours to be totally unfunded
4693 saOwnerFunds.clear(book.out);
4694 }
4695 else
4696 {
4697 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4698
4699 if (umBalanceEntry != umBalance.end())
4700 {
4701 // Found in running balance table.
4702
4703 saOwnerFunds = umBalanceEntry->second;
4704 }
4705 else
4706 {
4707 // Did not find balance in table.
4708
4709 saOwnerFunds = lesActive.accountHolds(
4710 uOfferOwnerID,
4711 book.out.currency,
4712 book.out.account,
4714
4715 if (saOwnerFunds.isNegative())
4716 {
4717 // Treat negative funds as zero.
4718
4719 saOwnerFunds.zero();
4720 }
4721 }
4722 }
4723
4724 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4725
4726 STAmount saTakerGetsFunded;
4727 STAmount saOwnerFundsLimit = saOwnerFunds;
4728 Rate offerRate = parityRate;
4729
4730 if (rate != parityRate
4731 // Have a transfer fee.
4732 && uTakerID != book.out.account
4733 // Not taking offers of own IOUs.
4734 && book.out.account != uOfferOwnerID)
4735 // Offer owner not issuing ownfunds
4736 {
4737 // Need to charge a transfer fee to offer owner.
4738 offerRate = rate;
4739 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4740 }
4741
4742 if (saOwnerFundsLimit >= saTakerGets)
4743 {
4744 // Sufficient funds no shenanigans.
4745 saTakerGetsFunded = saTakerGets;
4746 }
4747 else
4748 {
4749 // Only provide, if not fully funded.
4750 saTakerGetsFunded = saOwnerFundsLimit;
4751
4752 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4753
4754 // TODO(tom): The result of this expression is not used - what's
4755 // going on here?
4756 std::min(
4757 saTakerPays,
4758 multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4759 .setJson(jvOffer[jss::taker_pays_funded]);
4760 }
4761
4762 STAmount saOwnerPays = (parityRate == offerRate)
4763 ? saTakerGetsFunded
4764 : std::min(
4765 saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4766
4767 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4768
4769 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4770 {
4771 // Only provide funded offers and offers of the taker.
4772 Json::Value& jvOf = jvOffers.append(jvOffer);
4773 jvOf[jss::quality] = saDirRate.getText();
4774 }
4775 }
4776 }
4777
4778 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4779 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4780}
4781
4782#endif
4783
4784inline void
4786{
4787 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4788 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4790 counters[static_cast<std::size_t>(mode)].dur += current;
4791
4794 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4795 .dur.count());
4797 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4798 .dur.count());
4800 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4802 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4803 .dur.count());
4805 counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4806
4808 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)]
4809 .transitions);
4811 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)]
4812 .transitions);
4814 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4816 counters[static_cast<std::size_t>(OperatingMode::TRACKING)]
4817 .transitions);
4819 counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4820}
4821
4822void
4824{
4825 auto now = std::chrono::steady_clock::now();
4826
4827 std::lock_guard lock(mutex_);
4828 ++counters_[static_cast<std::size_t>(om)].transitions;
4829 if (om == OperatingMode::FULL &&
4830 counters_[static_cast<std::size_t>(om)].transitions == 1)
4831 {
4832 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
4833 now - processStart_)
4834 .count();
4835 }
4836 counters_[static_cast<std::size_t>(mode_)].dur +=
4837 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4838
4839 mode_ = om;
4840 start_ = now;
4841}
4842
4843void
4845{
4846 auto [counters, mode, start, initialSync] = getCounterData();
4847 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4849 counters[static_cast<std::size_t>(mode)].dur += current;
4850
4851 obj[jss::state_accounting] = Json::objectValue;
4853 i <= static_cast<std::size_t>(OperatingMode::FULL);
4854 ++i)
4855 {
4856 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4857 auto& state = obj[jss::state_accounting][states_[i]];
4858 state[jss::transitions] = std::to_string(counters[i].transitions);
4859 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4860 }
4861 obj[jss::server_state_duration_us] = std::to_string(current.count());
4862 if (initialSync)
4863 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4864}
4865
4866//------------------------------------------------------------------------------
4867
4870 Application& app,
4872 bool standalone,
4873 std::size_t minPeerCount,
4874 bool startvalid,
4875 JobQueue& job_queue,
4877 ValidatorKeys const& validatorKeys,
4878 boost::asio::io_context& io_svc,
4879 beast::Journal journal,
4880 beast::insight::Collector::ptr const& collector)
4881{
4883 app,
4884 clock,
4885 standalone,
4886 minPeerCount,
4887 startvalid,
4888 job_queue,
4890 validatorKeys,
4891 io_svc,
4892 journal,
4893 collector);
4894}
4895
4896} // namespace xrpl
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:45
Represents a JSON value.
Definition json_value.h:131
Json::UInt UInt
Definition json_value.h:138
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
A metric for measuring an integral value.
Definition Gauge.h:21
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:49
A reference to a handler for performing polled collection.
Definition Hook.h:13
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual perf::PerfLog & getPerfLog()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual TxQ & getTxQ()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual TimeKeeper & timeKeeper()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual OpenLedger & openLedger()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual Overlay & overlay()=0
virtual JobQueue & getJobQueue()=0
virtual ManifestCache & validatorManifests()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
virtual ValidatorList & validators()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Specifies an order book.
Definition Book.h:17
Issue in
Definition Book.h:19
Issue out
Definition Book.h:20
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
Definition ClusterNode.h:33
NetClock::time_point getReportTime() const
Definition ClusterNode.h:39
PublicKey const & identity() const
Definition ClusterNode.h:45
std::string const & name() const
Definition ClusterNode.h:27
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
uint32_t NETWORK_ID
Definition Config.h:138
std::string SERVER_DOMAIN
Definition Config.h:260
int RELAY_UNTRUSTED_VALIDATIONS
Definition Config.h:151
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
Definition Config.h:142
std::size_t NODE_SIZE
Definition Config.h:195
virtual Json::Value getInfo()=0
virtual void clearFailures()=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:35
Currency currency
Definition Issue.h:16
AccountID account
Definition Issue.h:17
A pool of threads to perform work.
Definition JobQueue.h:38
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
Json::Value getJson(int c=0)
Definition JobQueue.cpp:193
std::chrono::seconds getValidatedLedgerAge()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
bool haveValidated()
Whether we have ever fully validated a ledger.
std::size_t getFetchPackCacheSize() const
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::shared_ptr< Ledger const > getValidatedLedger()
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< ReadView const > getCurrentLedger()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
std::uint32_t getLoadBase() const
Manages load sources.
Definition LoadManager.h:27
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition Manifest.cpp:303
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(Json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< Json::StaticString const, 5 > const states_
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
ServerFeeSummary mLastFeeSummary
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
DispatchState mDispatchState
std::size_t const minPeerCount_
beast::Journal m_journal
static std::array< char const *, 5 > const states_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(Json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void stop() override
SubInfoMapType mSubRTAccount
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
bool unsubRTTransactions(std::uint64_t uListener) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
bool processTrustedProposal(RCLCxPeerPos proposal) override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
std::atomic< OperatingMode > mMode
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
bool const m_standalone
std::recursive_mutex mSubLock
bool isNeedNetworkLedger() override
DispatchState
Synchronization states for transaction batches.
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
bool subConsensus(InfoSub::ref ispListener) override
bool unsubBook(std::uint64_t uListener, Book const &) override
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
bool isFull() override
void clearAmendmentWarned() override
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
std::unique_ptr< LocalTxs > m_localTX
void setStandAlone() override
void setNeedNetworkLedger() override
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
bool unsubServer(std::uint64_t uListener) override
SubAccountHistoryMapType mSubAccountHistory
bool unsubConsensus(std::uint64_t uListener) override
std::condition_variable mCond
void pubManifest(Manifest const &) override
RCLConsensus mConsensus
void consensusViewChange() override
boost::asio::steady_timer accountHistoryTxTimer_
Json::Value getConsensusInfo() override
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
ConsensusPhase mLastConsensusPhase
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
std::vector< TransactionStatus > mTransactions
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
LedgerMaster & m_ledgerMaster
Json::Value getServerInfo(bool human, bool admin, bool counters) override
StateAccounting accounting_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
subRpcMapType mRpcSubMap
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex m_statsMutex
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
SubInfoMapType mSubAccount
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
boost::asio::steady_timer clusterTimer_
bool unsubPeerStatus(std::uint64_t uListener) override
void reportFeeChange() override
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isBlocked() override
~NetworkOPsImp() override
Application & app_
Json::Value getLedgerFetchInfo() override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:70
void getCountsJson(Json::Value &obj)
Definition Database.cpp:248
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:46
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnectCharges() const =0
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:44
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:487
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:32
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
Issue const & issue() const
Definition STAmount.h:488
std::string getText() const override
Definition STAmount.cpp:663
void setJson(Json::Value &) const
Definition STAmount.cpp:623
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
Definition Serializer.h:53
void const * data() const noexcept
Definition Serializer.h:59
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:45
std::chrono::seconds closeOffset() const
Definition TimeKeeper.h:64
time_point closeTime() const
Returns the predicted close time, in network time.
Definition TimeKeeper.h:57
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Definition TxQ.cpp:1756
static time_point now()
Validator keys and manifest as set in configuration file.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::size_t quorum() const
Get quorum value for current trusted key set.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
Json::Value jsonClipped() const
Definition XRPAmount.h:199
constexpr double decimalXRP() const
Definition XRPAmount.h:243
iterator begin()
Definition base_uint.h:117
bool isZero() const
Definition base_uint.h:521
static constexpr std::size_t size()
Definition base_uint.h:507
bool isNonZero() const
Definition base_uint.h:526
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:212
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:15
STL namespace.
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:49
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:34
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:28
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:256
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:166
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:362
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:50
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:74
@ terQUEUED
Definition TER.h:206
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isTerRetry(TER x) noexcept
Definition TER.h:653
@ fhZERO_IF_FROZEN
Definition View.h:59
@ fhIGNORE_FREEZE
Definition View.h:59
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ INVALID
Definition Transaction.h:29
@ OBSOLETE
Definition Transaction.h:35
@ INCLUDED
Definition Transaction.h:30
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:42
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
@ tefPAST_SEQ
Definition TER.h:156
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:131
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1110
Number root(Number f, unsigned d)
Definition Number.cpp:644
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:230
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
Definition View.cpp:137
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:34
static auto const genesisAccountId
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:57
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
Definition View.cpp:126
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j)
Definition View.cpp:461
@ current
This was a new validation and was added.
constexpr std::size_t maxPoppedTransactions
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
Definition View.cpp:657
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Definition View.cpp:163
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:965
@ jtNETOP_CLUSTER
Definition Job.h:55
@ jtCLIENT_CONSENSUS
Definition Job.h:28
@ jtTXN_PROC
Definition Job.h:62
@ jtCLIENT_ACCT_HIST
Definition Job.h:29
@ jtTRANSACTION
Definition Job.h:42
@ jtCLIENT_FEE_CHANGE
Definition Job.h:27
@ jtBATCH
Definition Job.h:45
bool isTefFailure(TER x) noexcept
Definition TER.h:647
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
Definition View.cpp:864
auto constexpr muldiv_max
Definition mulDiv.h:9
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:123
ConsensusPhase
Phases of consensus for a single ledger round.
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:56
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:98
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:73
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:173
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:25
ApplyFlags
Definition ApplyView.h:11
@ tapNONE
Definition ApplyView.h:12
@ tapFAIL_HARD
Definition ApplyView.h:16
@ tapUNLIMITED
Definition ApplyView.h:23
bool isTelLocal(TER x) noexcept
Definition TER.h:635
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
@ temINVALID_FLAG
Definition TER.h:92
@ temBAD_SIGNATURE
Definition TER.h:86
bool isTesSuccess(TER x) noexcept
Definition TER.h:659
static std::uint32_t trunc32(std::uint64_t v)
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:158
static std::array< char const *, 5 > const stateNames
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:49
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:801
bool isTemMalformed(TER x) noexcept
Definition TER.h:641
@ tesSUCCESS
Definition TER.h:226
error_code_i
Definition ErrorCodes.h:21
@ rpcINTERNAL
Definition ErrorCodes.h:111
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:65
@ rpcSUCCESS
Definition ErrorCodes.h:25
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:155
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:154
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:156
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:67
std::string serialized
The manifest in serialized form.
Definition Manifest.h:64
Blob getMasterSignature() const
Returns manifest master key signature.
Definition Manifest.cpp:235
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:79
std::optional< Blob > getSignature() const
Returns manifest signature.
Definition Manifest.cpp:224
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:73
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:76
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge connected_transitions
beast::insight::Gauge full_transitions
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge full_duration
beast::insight::Hook hook
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:146
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:118
Sends a message to all peers.
Definition predicates.h:13
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)