xrpld
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpl/server/NetworkOPs.h>
2
3#include <xrpld/app/consensus/RCLConsensus.h>
4#include <xrpld/app/consensus/RCLCxPeerPos.h>
5#include <xrpld/app/consensus/RCLValidations.h>
6#include <xrpld/app/ledger/AcceptedLedger.h>
7#include <xrpld/app/ledger/InboundLedger.h>
8#include <xrpld/app/ledger/InboundLedgers.h>
9#include <xrpld/app/ledger/LedgerMaster.h>
10#include <xrpld/app/ledger/LedgerToJson.h>
11#include <xrpld/app/ledger/LocalTxs.h>
12#include <xrpld/app/ledger/OpenLedger.h>
13#include <xrpld/app/ledger/TransactionMaster.h>
14#include <xrpld/app/main/LoadManager.h>
15#include <xrpld/app/main/Tuning.h>
16#include <xrpld/app/misc/DeliverMax.h>
17#include <xrpld/app/misc/FeeVote.h>
18#include <xrpld/app/misc/Transaction.h>
19#include <xrpld/app/misc/TxQ.h>
20#include <xrpld/app/misc/ValidatorKeys.h>
21#include <xrpld/app/misc/ValidatorList.h>
22#include <xrpld/app/misc/make_NetworkOPs.h>
23#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
24#include <xrpld/consensus/ConsensusParms.h>
25#include <xrpld/consensus/ConsensusTypes.h>
26#include <xrpld/core/Config.h>
27#include <xrpld/overlay/Cluster.h>
28#include <xrpld/overlay/ClusterNode.h>
29#include <xrpld/overlay/Overlay.h>
30#include <xrpld/overlay/predicates.h>
31#include <xrpld/rpc/BookChanges.h>
32#include <xrpld/rpc/CTID.h>
33#include <xrpld/rpc/DeliveredAmount.h>
34#include <xrpld/rpc/MPTokenIssuanceID.h>
35#include <xrpld/rpc/ServerHandler.h>
36
37#include <xrpl/basics/Log.h>
38#include <xrpl/basics/ToString.h>
39#include <xrpl/basics/UnorderedContainers.h>
40#include <xrpl/basics/UptimeClock.h>
41#include <xrpl/basics/base_uint.h>
42#include <xrpl/basics/chrono.h>
43#include <xrpl/basics/contract.h>
44#include <xrpl/basics/mulDiv.h>
45#include <xrpl/basics/safe_cast.h>
46#include <xrpl/basics/scope.h>
47#include <xrpl/basics/strHex.h>
48#include <xrpl/beast/clock/abstract_clock.h>
49#include <xrpl/beast/insight/Collector.h>
50#include <xrpl/beast/insight/Gauge.h>
51#include <xrpl/beast/insight/Hook.h>
52#include <xrpl/beast/net/IPEndpoint.h>
53#include <xrpl/beast/utility/Zero.h>
54#include <xrpl/beast/utility/instrumentation.h>
55#include <xrpl/beast/utility/rngfill.h>
56#include <xrpl/config/Constants.h>
57#include <xrpl/core/ClosureCounter.h>
58#include <xrpl/core/HashRouter.h>
59#include <xrpl/core/Job.h>
60#include <xrpl/core/NetworkIDService.h>
61#include <xrpl/core/PerfLog.h>
62#include <xrpl/core/ServiceRegistry.h>
63#include <xrpl/crypto/RFC1751.h>
64#include <xrpl/crypto/csprng.h>
65#include <xrpl/git/Git.h>
66#include <xrpl/json/json_forwards.h>
67#include <xrpl/json/json_value.h>
68#include <xrpl/json/json_writer.h>
69#include <xrpl/ledger/AcceptedLedgerTx.h>
70#include <xrpl/ledger/AmendmentTable.h>
71#include <xrpl/ledger/ApplyView.h>
72#include <xrpl/ledger/CanonicalTXSet.h>
73#include <xrpl/ledger/Ledger.h>
74#include <xrpl/ledger/OpenView.h>
75#include <xrpl/ledger/OrderBookDB.h>
76#include <xrpl/ledger/ReadView.h>
77#include <xrpl/ledger/helpers/AccountRootHelpers.h>
78#include <xrpl/ledger/helpers/DirectoryHelpers.h>
79#include <xrpl/ledger/helpers/TokenHelpers.h>
80#include <xrpl/protocol/AccountID.h>
81#include <xrpl/protocol/ApiVersion.h>
82#include <xrpl/protocol/Book.h>
83#include <xrpl/protocol/BuildInfo.h>
84#include <xrpl/protocol/ErrorCodes.h>
85#include <xrpl/protocol/Feature.h>
86#include <xrpl/protocol/Fees.h>
87#include <xrpl/protocol/Indexes.h>
88#include <xrpl/protocol/KeyType.h>
89#include <xrpl/protocol/LedgerFormats.h>
90#include <xrpl/protocol/MultiApiJson.h>
91#include <xrpl/protocol/NFTSyntheticSerializer.h>
92#include <xrpl/protocol/Protocol.h>
93#include <xrpl/protocol/PublicKey.h>
94#include <xrpl/protocol/RPCErr.h>
95#include <xrpl/protocol/Rate.h>
96#include <xrpl/protocol/SField.h>
97#include <xrpl/protocol/STAmount.h>
98#include <xrpl/protocol/STTx.h>
99#include <xrpl/protocol/SecretKey.h>
100#include <xrpl/protocol/Seed.h>
101#include <xrpl/protocol/Serializer.h>
102#include <xrpl/protocol/TER.h>
103#include <xrpl/protocol/TxFlags.h>
104#include <xrpl/protocol/TxFormats.h>
105#include <xrpl/protocol/Units.h>
106#include <xrpl/protocol/XRPAmount.h>
107#include <xrpl/protocol/jss.h>
108#include <xrpl/protocol/tokens.h>
109#include <xrpl/rdb/RelationalDatabase.h>
110#include <xrpl/resource/Fees.h>
111#include <xrpl/resource/Gossip.h>
112#include <xrpl/resource/ResourceManager.h>
113#include <xrpl/server/InfoSub.h>
114#include <xrpl/server/LoadFeeTrack.h>
115#include <xrpl/server/Manifest.h>
116#include <xrpl/shamap/SHAMap.h>
117#include <xrpl/tx/apply.h>
118
119#include <boost/asio/error.hpp>
120#include <boost/asio/io_context.hpp>
121#include <boost/asio/ip/host_name.hpp>
122#include <boost/asio/steady_timer.hpp>
123#include <boost/system/detail/errc.hpp>
124#include <boost/system/detail/error_code.hpp>
125#include <boost/system/system_error.hpp>
126
127#include <xrpl.pb.h>
128
129#include <algorithm>
130#include <array>
131#include <atomic>
132#include <chrono>
133#include <climits>
134#include <condition_variable>
135#include <cstddef>
136#include <cstdint>
137#include <cstdlib>
138#include <exception>
139#include <functional>
140#include <iterator>
141#include <limits>
142#include <memory>
143#include <mutex>
144#include <optional>
145#include <set>
146#include <sstream>
147#include <stdexcept>
148#include <string>
149#include <string_view>
150#include <type_traits>
151#include <unordered_map>
152#include <utility>
153#include <vector>
154
155namespace xrpl {
156
157class NetworkOPsImp final : public NetworkOPs
158{
162
164 {
165 public:
167 bool const admin;
168 bool const local;
170 bool applied = false;
172
174 : transaction(std::move(t)), admin(a), local(l), failType(f)
175 {
176 XRPL_ASSERT(
178 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
179 "valid inputs");
180 }
181 };
182
186 enum class DispatchState : unsigned char {
190 };
191
193
209 {
217
221 std::chrono::steady_clock::time_point start_ = std::chrono::steady_clock::now();
222 std::chrono::steady_clock::time_point const processStart_ = start_;
225
226 public:
228 {
229 counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions = 1;
230 }
231
238 void
240
246 void
247 json(json::Value& obj) const;
248
256
257 CounterData
259 {
260 std::scoped_lock const lock(mutex_);
261 return {
262 .counters = counters_,
263 .mode = mode_,
264 .start = start_,
265 .initialSyncUs = initialSyncUs_};
266 }
267 };
268
271 {
272 ServerFeeSummary() = default;
273
275 XRPAmount fee,
276 TxQ::Metrics escalationMetrics, // trivially copyable
277 LoadFeeTrack const& loadFeeTrack);
278 bool
279 operator!=(ServerFeeSummary const& b) const;
280
281 bool
283 {
284 return !(*this != b);
285 }
286
291 };
292
293public:
295 ServiceRegistry& registry,
297 bool standalone,
298 std::size_t minPeerCount,
299 bool startValid,
300 JobQueue& jobQueue,
301 LedgerMaster& ledgerMaster,
302 ValidatorKeys const& validatorKeys,
303 boost::asio::io_context& ioCtx,
305 beast::insight::Collector::ptr const& collector)
306 : registry_(registry)
310 , heartbeatTimer_(ioCtx)
311 , clusterTimer_(ioCtx)
313 , consensus_(
314 registry_.get().getApp(),
316 setupFeeVote(registry_.get().getApp().config().section(Sections::kVoting)),
317 registry_.get().getJournal("FeeVote")),
318 ledgerMaster,
319 *localTX_,
320 registry.getInboundTransactions(),
321 beast::getAbstractClock<std::chrono::steady_clock>(),
322 validatorKeys,
323 registry_.get().getJournal("LedgerConsensus"))
324 , validatorPK_(
325 validatorKeys.keys ? validatorKeys.keys->publicKey : decltype(validatorPK_){})
327 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
328 : decltype(validatorMasterPK_){})
329 , ledgerMaster_(ledgerMaster)
330 , jobQueue_(jobQueue)
331 , standalone_(standalone)
332 , minPeerCount_(startValid ? 0 : minPeerCount)
334 {
335 }
336
337 ~NetworkOPsImp() override
338 {
339 // This clear() is necessary to ensure the shared_ptrs in this map get
340 // destroyed NOW because the objects in this map invoke methods on this
341 // class when they are destroyed
342 rpcSubMap_.clear();
343 }
344
345public:
347 getOperatingMode() const override;
348
350 strOperatingMode(OperatingMode const mode, bool const admin) const override;
351
353 strOperatingMode(bool const admin = false) const override;
354
355 //
356 // Transaction operations.
357 //
358
359 // Must complete immediately.
360 void
362
363 void
365 std::shared_ptr<Transaction>& transaction,
366 bool bUnlimited,
367 bool bLocal,
368 FailHard failType) override;
369
370 void
372
381 void
382 doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType);
383
393 void
396 bool bUnlimited,
397 FailHard failtype);
398
399private:
400 bool
402
403 void
406 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
407
408public:
412 void
414
420 void
422
423 //
424 // Owner functions.
425 //
426
428 getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account) override;
429
430 //
431 // Book functions.
432 //
433
434 void
437 Book const&,
438 AccountID const& uTakerID,
439 bool const bProof,
440 unsigned int iLimit,
441 json::Value const& jvMarker,
442 json::Value& jvResult) override;
443
444 // Ledger proposal/close functions.
445 bool
446 processTrustedProposal(RCLCxPeerPos proposal) override;
447
448 bool
449 recvValidation(std::shared_ptr<STValidation> const& val, std::string const& source) override;
450
451 void
452 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
453
454 // Network state machine.
455
456 // Used for the "jump" case.
457private:
458 void
460 bool
462
463public:
464 bool
465 beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog)
466 override;
467 void
469 void
470 setStandAlone() override;
471
475 void
476 setStateTimer() override;
477
478 void
479 setNeedNetworkLedger() override;
480 void
481 clearNeedNetworkLedger() override;
482 bool
483 isNeedNetworkLedger() override;
484 bool
485 isFull() override;
486
487 void
488 setMode(OperatingMode om) override;
489
490 bool
491 isBlocked() override;
492 bool
493 isAmendmentBlocked() override;
494 void
495 setAmendmentBlocked() override;
496 bool
497 isAmendmentWarned() override;
498 void
499 setAmendmentWarned() override;
500 void
501 clearAmendmentWarned() override;
502 bool
503 isUNLBlocked() override;
504 void
505 setUNLBlocked() override;
506 void
507 clearUNLBlocked() override;
508 void
509 consensusViewChange() override;
510
512 getConsensusInfo() override;
514 getServerInfo(bool human, bool admin, bool counters) override;
515 void
516 clearLedgerFetch() override;
518 getLedgerFetchInfo() override;
521 void
522 reportFeeChange() override;
523 void
525
526 void
527 updateLocalTx(ReadView const& view) override;
529 getLocalTxCount() override;
531 getBookSubscribersCount() override;
532
533 //
534 // Monitoring: publisher side.
535 //
536 void
537 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
538 void
541 std::shared_ptr<STTx const> const& transaction,
542 TER result) override;
543 void
544 pubValidation(std::shared_ptr<STValidation> const& val) override;
545
546 //--------------------------------------------------------------------------
547 //
548 // InfoSub::Source.
549 //
550 void
551 subAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
552 override;
553 void
554 unsubAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
555 override;
556
557 // Just remove the subscription from the tracking
558 // not from the InfoSub. Needed for InfoSub destruction
559 void
560 unsubAccountInternal(std::uint64_t seq, hash_set<AccountID> const& vnaAccountIDs, bool rt)
561 override;
562
564 subAccountHistory(InfoSub::ref ispListener, AccountID const& account) override;
565 void
566 unsubAccountHistory(InfoSub::ref ispListener, AccountID const& account, bool historyOnly)
567 override;
568
569 void
570 unsubAccountHistoryInternal(std::uint64_t seq, AccountID const& account, bool historyOnly)
571 override;
572
573 bool
574 subLedger(InfoSub::ref ispListener, json::Value& jvResult) override;
575 bool
576 unsubLedger(std::uint64_t uListener) override;
577
578 bool
579 subBookChanges(InfoSub::ref ispListener) override;
580 bool
581 unsubBookChanges(std::uint64_t uListener) override;
582
583 bool
584 subServer(InfoSub::ref ispListener, json::Value& jvResult, bool admin) override;
585 bool
586 unsubServer(std::uint64_t uListener) override;
587
588 bool
589 subBook(InfoSub::ref ispListener, Book const&) override;
590 bool
591 unsubBook(InfoSub::ref ispListener, Book const&) override;
592 bool
593 unsubBookInternal(std::uint64_t uListener, Book const&) override;
594
595 bool
596 subManifests(InfoSub::ref ispListener) override;
597 bool
598 unsubManifests(std::uint64_t uListener) override;
599 void
600 pubManifest(Manifest const&) override;
601
602 bool
603 subTransactions(InfoSub::ref ispListener) override;
604 bool
605 unsubTransactions(std::uint64_t uListener) override;
606
607 bool
608 subRTTransactions(InfoSub::ref ispListener) override;
609 bool
610 unsubRTTransactions(std::uint64_t uListener) override;
611
612 bool
613 subValidations(InfoSub::ref ispListener) override;
614 bool
615 unsubValidations(std::uint64_t uListener) override;
616
617 bool
618 subPeerStatus(InfoSub::ref ispListener) override;
619 bool
620 unsubPeerStatus(std::uint64_t uListener) override;
621 void
622 pubPeerStatus(std::function<json::Value(void)> const&) override;
623
624 bool
625 subConsensus(InfoSub::ref ispListener) override;
626 bool
627 unsubConsensus(std::uint64_t uListener) override;
628
630 findRpcSub(std::string const& strUrl) override;
632 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
633 bool
634 tryRemoveRpcSub(std::string const& strUrl) override;
635
636 beast::Journal const&
637 journal() const override
638 {
639 return journal_;
640 }
641
642 void
643 stop() override
644 {
645 {
646 try
647 {
648 heartbeatTimer_.cancel();
649 }
650 catch (boost::system::system_error const& e)
651 {
652 JLOG(journal_.error()) << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
653 }
654
655 try
656 {
657 clusterTimer_.cancel();
658 }
659 catch (boost::system::system_error const& e)
660 {
661 JLOG(journal_.error()) << "NetworkOPs: clusterTimer cancel error: " << e.what();
662 }
663
664 try
665 {
666 accountHistoryTxTimer_.cancel();
667 }
668 catch (boost::system::system_error const& e)
669 {
670 JLOG(journal_.error())
671 << "NetworkOPs: accountHistoryTxTimer cancel error: " << e.what();
672 }
673 }
674 // Make sure that any waitHandlers pending in our timers are done.
675 using namespace std::chrono_literals;
676 waitHandlerCounter_.join("NetworkOPs", 1s, journal_);
677 }
678
679 void
680 stateAccounting(json::Value& obj) override;
681
682private:
683 void
684 setTimer(
685 boost::asio::steady_timer& timer,
686 std::chrono::milliseconds const& expiryTime,
687 std::function<void()> onExpire,
688 std::function<void()> onError);
689 void
691 void
693 void
695 void
697
699 transJson(
700 std::shared_ptr<STTx const> const& transaction,
701 TER result,
702 bool validated,
705
706 void
709 AcceptedLedgerTx const& transaction,
710 bool last);
711
712 void
715 AcceptedLedgerTx const& transaction,
716 bool last);
717
741 void
742 pubBookTransaction(AcceptedLedgerTx const& transaction, MultiApiJson const& jvObj);
743
744 void
747 std::shared_ptr<STTx const> const& transaction,
748 TER result);
749
750 void
751 pubServer();
752 void
754
756 getHostId(bool forAdmin);
757
758private:
762
763 /*
764 * With a validated ledger to separate history and future, the node
765 * streams historical txns with negative indexes starting from -1,
766 * and streams future txns starting from index 0.
767 * The SubAccountHistoryIndex struct maintains these indexes.
768 * It also has a flag stopHistorical_ for stopping streaming
769 * the historical txns.
770 */
800
804 void
808 void
810 void
812
815
817
819
821
826
828 boost::asio::steady_timer heartbeatTimer_;
829 boost::asio::steady_timer clusterTimer_;
830 boost::asio::steady_timer accountHistoryTxTimer_;
831
833
836
838
840
850
854
856
858
859 // Used as array indices; converting to enum class would require casts at ~40 call sites.
860 // NOLINTNEXTLINE(cppcoreguidelines-use-enum-class)
861 enum SubTypes {
862 SLedger, // Accepted ledgers.
863 SManifests, // Received validator manifests.
864 SServer, // When server changes connectivity state.
865 STransactions, // All accepted transactions.
866 SRtTransactions, // All proposed and accepted transactions.
867 SValidations, // Received validations.
868 SPeerStatus, // Peer status changes.
869 SConsensusPhase, // Consensus phase
870 SBookChanges, // Per-ledger order book changes
871 SLastEntry // Any new entry must be ADDED ABOVE this one
872 };
873
875
877
879
880 // Whether we are in standalone mode.
881 bool const standalone_;
882
883 // The number of nodes that we need to consider ourselves connected.
885
886 // Transaction batching.
891
893
896
897private:
898 struct Stats
899 {
900 template <class Handler>
901 Stats(Handler const& handler, beast::insight::Collector::ptr const& collector)
902 : hook(collector->makeHook(handler))
904 collector->makeGauge("State_Accounting", "Disconnected_duration"))
905 , connectedDuration(collector->makeGauge("State_Accounting", "Connected_duration"))
906 , syncingDuration(collector->makeGauge("State_Accounting", "Syncing_duration"))
907 , trackingDuration(collector->makeGauge("State_Accounting", "Tracking_duration"))
908 , fullDuration(collector->makeGauge("State_Accounting", "Full_duration"))
910 collector->makeGauge("State_Accounting", "Disconnected_transitions"))
912 collector->makeGauge("State_Accounting", "Connected_transitions"))
913 , syncingTransitions(collector->makeGauge("State_Accounting", "Syncing_transitions"))
914 , trackingTransitions(collector->makeGauge("State_Accounting", "Tracking_transitions"))
915 , fullTransitions(collector->makeGauge("State_Accounting", "Full_transitions"))
916 {
917 }
918
925
931 };
932
933 std::mutex statsMutex_; // Mutex to lock stats_
935
936private:
937 void
939};
940
941//------------------------------------------------------------------------------
942
944 {"disconnected", "connected", "syncing", "tracking", "full"}};
945
947
954
955static auto const kGenesisAccountId =
957
958//------------------------------------------------------------------------------
959inline OperatingMode
961{
962 return mode_;
963}
964
965inline std::string
966NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
967{
968 return strOperatingMode(mode_, admin);
969}
970
971inline void
976
977inline void
982
983inline void
988
989inline bool
994
995inline bool
1000
1003{
1004 static std::string const kHostname = boost::asio::ip::host_name();
1005
1006 if (forAdmin)
1007 return kHostname;
1008
1009 // For non-admin uses hash the node public key into a
1010 // single RFC1751 word:
1011 static std::string const kShroudedHostId = [this]() {
1012 auto const& id = registry_.get().getApp().nodeIdentity();
1013
1014 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
1015 }();
1016
1017 return kShroudedHostId;
1018}
1019
1020void
1022{
1024
1025 // Only do this work if a cluster is configured
1026 if (registry_.get().getCluster().size() != 0)
1028}
1029
1030void
1032 boost::asio::steady_timer& timer,
1033 std::chrono::milliseconds const& expiryTime,
1034 std::function<void()> onExpire,
1035 std::function<void()> onError)
1036{
1037 // Only start the timer if waitHandlerCounter_ is not yet joined.
1038 if (auto optionalCountedHandler =
1039 waitHandlerCounter_.wrap([this, onExpire, onError](boost::system::error_code const& e) {
1040 if ((e.value() == boost::system::errc::success) && (!jobQueue_.isStopped()))
1041 {
1042 onExpire();
1043 }
1044 // Recover as best we can if an unexpected error occurs.
1045 if (e.value() != boost::system::errc::success &&
1046 e.value() != boost::asio::error::operation_aborted)
1047 {
1048 // Try again later and hope for the best.
1049 JLOG(journal_.error())
1050 << "Timer got error '" << e.message() << "'. Restarting timer.";
1051 onError();
1052 }
1053 }))
1054 {
1055 timer.expires_after(expiryTime);
1056 timer.async_wait(std::move(*optionalCountedHandler));
1057 }
1058}
1059
1060void
1062{
1063 setTimer(
1065 consensus_.parms().ledgerGRANULARITY,
1066 [this]() {
1067 jobQueue_.addJob(JtNetopTimer, "NetHeart", [this]() { processHeartbeatTimer(); });
1068 },
1069 [this]() { setHeartbeatTimer(); });
1070}
1071
1072void
1074{
1075 using namespace std::chrono_literals;
1076
1077 setTimer(
1079 10s,
1080 [this]() {
1081 jobQueue_.addJob(JtNetopCluster, "NetCluster", [this]() { processClusterTimer(); });
1082 },
1083 [this]() { setClusterTimer(); });
1084}
1085
1086void
1088{
1089 JLOG(journal_.debug()) << "Scheduling AccountHistory job for account "
1090 << toBase58(subInfo.index->accountId);
1091 using namespace std::chrono_literals;
1092 setTimer(
1094 4s,
1095 [this, subInfo]() { addAccountHistoryJob(subInfo); },
1096 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1097}
1098
1099void
1101{
1102 RclConsensusLogger clog("Heartbeat Timer", consensus_.validating(), journal_);
1103 {
1104 std::unique_lock lock{registry_.get().getApp().getMasterMutex()};
1105
1106 // VFALCO NOTE This is for diagnosing a crash on exit
1107 LoadManager& mgr(registry_.get().getLoadManager());
1108 mgr.heartbeat();
1109
1110 std::size_t const numPeers = registry_.get().getOverlay().size();
1111
1112 // do we have sufficient peers? If not, we are disconnected.
1113 if (numPeers < minPeerCount_)
1114 {
1116 {
1119 ss << "Node count (" << numPeers << ") has fallen "
1120 << "below required minimum (" << minPeerCount_ << ").";
1121 JLOG(journal_.warn()) << ss.str();
1122 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
1123 }
1124 else
1125 {
1126 CLOG(clog.ss()) << "already DISCONNECTED. too few peers (" << numPeers
1127 << "), need at least " << minPeerCount_;
1128 }
1129
1130 // MasterMutex lock need not be held to call setHeartbeatTimer()
1131 lock.unlock();
1132 // We do not call consensus_.timerEntry until there are enough
1133 // peers providing meaningful inputs to consensus
1135
1136 return;
1137 }
1138
1140 {
1142 JLOG(journal_.info()) << "Node count (" << numPeers << ") is sufficient.";
1143 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
1144 }
1145
1146 // Check if the last validated ledger forces a change between these
1147 // states.
1148 auto origMode = mode_.load();
1149 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
1151 {
1153 }
1154 else if (mode_ == OperatingMode::CONNECTED)
1155 {
1157 }
1158 auto newMode = mode_.load();
1159 if (origMode != newMode)
1160 {
1161 CLOG(clog.ss()) << ", changing to " << strOperatingMode(newMode, true);
1162 }
1163 CLOG(clog.ss()) << ". ";
1164 }
1165
1166 consensus_.timerEntry(registry_.get().getTimeKeeper().closeTime(), clog.ss());
1167
1168 CLOG(clog.ss()) << "consensus phase " << to_string(lastConsensusPhase_);
1169 ConsensusPhase const currPhase = consensus_.phase();
1170 if (lastConsensusPhase_ != currPhase)
1171 {
1172 reportConsensusStateChange(currPhase);
1173 lastConsensusPhase_ = currPhase;
1174 CLOG(clog.ss()) << " changed to " << to_string(lastConsensusPhase_);
1175 }
1176 CLOG(clog.ss()) << ". ";
1177
1179}
1180
1181void
1183{
1184 if (registry_.get().getCluster().size() == 0)
1185 return;
1186
1187 using namespace std::chrono_literals;
1188
1189 bool const update = registry_.get().getCluster().update(
1190 registry_.get().getApp().nodeIdentity().first,
1191 "",
1192 (ledgerMaster_.getValidatedLedgerAge() <= 4min)
1193 ? registry_.get().getFeeTrack().getLocalFee()
1194 : 0,
1195 registry_.get().getTimeKeeper().now());
1196
1197 if (!update)
1198 {
1199 JLOG(journal_.debug()) << "Too soon to send cluster update";
1201 return;
1202 }
1203
1204 protocol::TMCluster cluster;
1205 registry_.get().getCluster().forEach([&cluster](ClusterNode const& node) {
1206 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1207 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1208 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1209 n.set_nodeload(node.getLoadFee());
1210 if (!node.name().empty())
1211 n.set_nodename(node.name());
1212 });
1213
1214 Resource::Gossip const gossip = registry_.get().getResourceManager().exportConsumers();
1215 for (auto& item : gossip.items)
1216 {
1217 protocol::TMLoadSource& node = *cluster.add_loadsources();
1218 node.set_name(to_string(item.address));
1219 node.set_cost(item.balance);
1220 }
1221 registry_.get().getOverlay().foreach(
1222 sendIf(std::make_shared<Message>(cluster, protocol::mtCLUSTER), PeerInCluster()));
1224}
1225
1226//------------------------------------------------------------------------------
1227
1229NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin) const
1230{
1231 if (mode == OperatingMode::FULL && admin)
1232 {
1233 auto const consensusMode = consensus_.mode();
1234 if (consensusMode != ConsensusMode::WrongLedger)
1235 {
1236 if (consensusMode == ConsensusMode::Proposing)
1237 return "proposing";
1238
1239 if (consensus_.validating())
1240 return "validating";
1241 }
1242 }
1243
1244 return kStates[static_cast<std::size_t>(mode)];
1245}
1246
1247void
1249{
1250 if (isNeedNetworkLedger())
1251 {
1252 // Nothing we can do if we've never been in sync
1253 return;
1254 }
1255
1256 // Enforce Network bar for batch txn
1257 if (iTrans->isFlag(tfInnerBatchTxn) && ledgerMaster_.getValidatedRules().enabled(featureBatch))
1258 {
1259 JLOG(journal_.error()) << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1260 return;
1261 }
1262
1263 // this is an asynchronous interface
1264 auto const trans = sterilize(*iTrans);
1265
1266 auto const txid = trans->getTransactionID();
1267 auto const flags = registry_.get().getHashRouter().getFlags(txid);
1268
1270 {
1271 JLOG(journal_.warn()) << "Submitted transaction cached bad";
1272 return;
1273 }
1274
1275 try
1276 {
1277 auto const [validity, reason] = checkValidity(
1278 registry_.get().getHashRouter(), *trans, ledgerMaster_.getValidatedRules());
1279
1280 if (validity != Validity::Valid)
1281 {
1282 JLOG(journal_.warn()) << "Submitted transaction invalid: " << reason;
1283 return;
1284 }
1285 }
1286 catch (std::exception const& ex)
1287 {
1288 JLOG(journal_.warn()) << "Exception checking transaction " << txid << ": " << ex.what();
1289
1290 return;
1291 }
1292
1293 std::string reason;
1294
1295 auto tx = std::make_shared<Transaction>(trans, reason, registry_.get().getApp());
1296
1297 jobQueue_.addJob(JtTransaction, "SubmitTxn", [this, tx]() {
1298 auto t = tx;
1299 processTransaction(t, false, false, FailHard::No);
1300 });
1301}
1302
1303bool
1305{
1306 auto const newFlags = registry_.get().getHashRouter().getFlags(transaction->getID());
1307
1309 {
1310 // cached bad
1311 JLOG(journal_.warn()) << transaction->getID() << ": cached bad!\n";
1312 transaction->setStatus(TransStatus::INVALID);
1313 transaction->setResult(temBAD_SIGNATURE);
1314 return false;
1315 }
1316
1317 auto const view = ledgerMaster_.getCurrentLedger();
1318
1319 // This function is called by several different parts of the codebase
1320 // under no circumstances will we ever accept an inner txn within a batch
1321 // txn from the network.
1322 auto const sttx = *transaction->getSTransaction();
1323 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1324 {
1325 transaction->setStatus(TransStatus::INVALID);
1326 transaction->setResult(temINVALID_FLAG);
1327 registry_.get().getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1328 return false;
1329 }
1330
1331 // NOTE ximinez - I think this check is redundant,
1332 // but I'm not 100% sure yet.
1333 // If so, only cost is looking up HashRouter flags.
1334 auto const [validity, reason] =
1335 checkValidity(registry_.get().getHashRouter(), sttx, view->rules());
1336 XRPL_ASSERT(
1337 validity == Validity::Valid, "xrpl::NetworkOPsImp::processTransaction : valid validity");
1338
1339 // Not concerned with local checks at this point.
1340 if (validity == Validity::SigBad)
1341 {
1342 JLOG(journal_.info()) << "Transaction has bad signature: " << reason;
1343 transaction->setStatus(TransStatus::INVALID);
1344 transaction->setResult(temBAD_SIGNATURE);
1345 registry_.get().getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1346 return false;
1347 }
1348
1349 // canonicalize can change our pointer
1350 registry_.get().getMasterTransaction().canonicalize(&transaction);
1351
1352 return true;
1353}
1354
1355void
1357 std::shared_ptr<Transaction>& transaction,
1358 bool bUnlimited,
1359 bool bLocal,
1360 FailHard failType)
1361{
1362 auto ev = jobQueue_.makeLoadEvent(JtTxnProc, "ProcessTXN");
1363
1364 // preProcessTransaction can change our pointer
1365 if (!preProcessTransaction(transaction))
1366 return;
1367
1368 if (bLocal)
1369 {
1370 doTransactionSync(transaction, bUnlimited, failType);
1371 }
1372 else
1373 {
1374 doTransactionAsync(transaction, bUnlimited, failType);
1375 }
1376}
1377
1378void
1380 std::shared_ptr<Transaction> transaction,
1381 bool bUnlimited,
1382 FailHard failType)
1383{
1384 std::scoped_lock const lock(mutex_);
1385
1386 if (transaction->getApplying())
1387 return;
1388
1389 transactions_.emplace_back(transaction, bUnlimited, false, failType);
1390 transaction->setApplying();
1391
1393 {
1394 if (jobQueue_.addJob(JtBatch, "TxBatchAsync", [this]() { transactionBatch(); }))
1395 {
1397 }
1398 }
1399}
1400
1401void
1403 std::shared_ptr<Transaction> transaction,
1404 bool bUnlimited,
1405 FailHard failType)
1406{
1408
1409 if (!transaction->getApplying())
1410 {
1411 transactions_.emplace_back(transaction, bUnlimited, true, failType);
1412 transaction->setApplying();
1413 }
1414
1415 doTransactionSyncBatch(lock, [&transaction](std::unique_lock<std::mutex> const&) {
1416 return transaction->getApplying();
1417 });
1418}
1419
1420void
1423 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1424{
1425 do
1426 {
1428 {
1429 // A batch processing job is already running, so wait.
1430 cond_.wait(lock);
1431 }
1432 else
1433 {
1434 apply(lock);
1435
1436 if (!transactions_.empty())
1437 {
1438 // More transactions need to be applied, but by another job.
1439 if (jobQueue_.addJob(JtBatch, "TxBatchSync", [this]() { transactionBatch(); }))
1440 {
1442 }
1443 }
1444 }
1445 } while (retryCallback(lock));
1446}
1447
1448void
1450{
1451 auto ev = jobQueue_.makeLoadEvent(JtTxnProc, "ProcessTXNSet");
1453 candidates.reserve(set.size());
1454 for (auto const& [_, tx] : set)
1455 {
1456 std::string reason;
1457 auto transaction = std::make_shared<Transaction>(tx, reason, registry_.get().getApp());
1458
1459 if (transaction->getStatus() == TransStatus::INVALID)
1460 {
1461 if (!reason.empty())
1462 {
1463 JLOG(journal_.trace()) << "Exception checking transaction: " << reason;
1464 }
1465 registry_.get().getHashRouter().setFlags(tx->getTransactionID(), HashRouterFlags::BAD);
1466 continue;
1467 }
1468
1469 // preProcessTransaction can change our pointer
1470 if (!preProcessTransaction(transaction))
1471 continue;
1472
1473 candidates.emplace_back(transaction);
1474 }
1475
1477 transactions.reserve(candidates.size());
1478
1480
1481 for (auto& transaction : candidates)
1482 {
1483 if (!transaction->getApplying())
1484 {
1485 transactions.emplace_back(transaction, false, false, FailHard::No);
1486 transaction->setApplying();
1487 }
1488 }
1489
1490 if (transactions_.empty())
1491 {
1493 }
1494 else
1495 {
1496 transactions_.reserve(transactions_.size() + transactions.size());
1497 for (auto& t : transactions)
1498 transactions_.push_back(std::move(t));
1499 }
1500 if (transactions_.empty())
1501 {
1502 JLOG(journal_.debug()) << "No transaction to process!";
1503 return;
1504 }
1505
1507 XRPL_ASSERT(lock.owns_lock(), "xrpl::NetworkOPsImp::processTransactionSet has lock");
1508 return std::ranges::any_of(
1509 transactions_, [](auto const& t) { return t.transaction->getApplying(); });
1510 });
1511}
1512
1513void
1515{
1517
1519 return;
1520
1521 while (!transactions_.empty())
1522 {
1523 apply(lock);
1524 }
1525}
1526
1527void
1529{
1533 XRPL_ASSERT(!transactions.empty(), "xrpl::NetworkOPsImp::apply : non-empty transactions");
1534 XRPL_ASSERT(
1535 dispatchState_ != DispatchState::Running, "xrpl::NetworkOPsImp::apply : is not running");
1536
1538
1539 batchLock.unlock();
1540
1541 {
1542 std::unique_lock masterLock{registry_.get().getApp().getMasterMutex(), std::defer_lock};
1543 bool changed = false;
1544 {
1545 std::unique_lock ledgerLock{ledgerMaster_.peekMutex(), std::defer_lock};
1546 std::lock(masterLock, ledgerLock);
1547 registry_.get().getOpenLedger().modify([&](OpenView& view, beast::Journal j) {
1549 {
1550 // we check before adding to the batch
1551 ApplyFlags flags = TapNone;
1552 if (e.admin)
1553 flags |= TapUnlimited;
1554
1555 if (e.failType == FailHard::Yes)
1556 flags |= TapFailHard;
1557
1558 auto const result = registry_.get().getTxQ().apply(
1559 registry_.get().getApp(), view, e.transaction->getSTransaction(), flags, j);
1560 e.result = result.ter;
1561 e.applied = result.applied;
1562 changed = changed || result.applied;
1563 }
1564 return changed;
1565 });
1566 }
1567 if (changed)
1569
1570 std::optional<LedgerIndex> validatedLedgerIndex;
1571 if (auto const l = ledgerMaster_.getValidatedLedger())
1572 validatedLedgerIndex = l->header().seq;
1573
1574 auto newOL = registry_.get().getOpenLedger().current();
1575 for (TransactionStatus const& e : transactions)
1576 {
1577 e.transaction->clearSubmitResult();
1578
1579 if (e.applied)
1580 {
1581 pubProposedTransaction(newOL, e.transaction->getSTransaction(), e.result);
1582 e.transaction->setApplied();
1583 }
1584
1585 e.transaction->setResult(e.result);
1586
1587 if (isTemMalformed(e.result))
1588 {
1589 registry_.get().getHashRouter().setFlags(
1590 e.transaction->getID(), HashRouterFlags::BAD);
1591 }
1592
1593#ifdef DEBUG
1594 if (!isTesSuccess(e.result))
1595 {
1596 std::string token, human;
1597
1598 if (transResultInfo(e.result, token, human))
1599 {
1600 JLOG(journal_.info()) << "TransactionResult: " << token << ": " << human;
1601 }
1602 }
1603#endif
1604
1605 bool const addLocal = e.local;
1606
1607 if (isTesSuccess(e.result))
1608 {
1609 JLOG(journal_.debug()) << "Transaction is now included in open ledger";
1610 e.transaction->setStatus(TransStatus::INCLUDED);
1611
1612 // Pop as many "reasonable" transactions for this account as
1613 // possible. "Reasonable" means they have sequential sequence
1614 // numbers, or use tickets.
1615 auto const& txCur = e.transaction->getSTransaction();
1616
1617 std::size_t count = 0;
1618 for (auto txNext = ledgerMaster_.popAcctTransaction(txCur);
1619 txNext && count < kMaxPoppedTransactions;
1620 txNext = ledgerMaster_.popAcctTransaction(txCur), ++count)
1621 {
1622 if (!batchLock.owns_lock())
1623 batchLock.lock();
1624 std::string reason;
1625 auto const trans = sterilize(*txNext);
1626 auto t = std::make_shared<Transaction>(trans, reason, registry_.get().getApp());
1627 if (t->getApplying())
1628 break;
1629 submitHeld.emplace_back(t, false, false, FailHard::No);
1630 t->setApplying();
1631 }
1632 if (batchLock.owns_lock())
1633 batchLock.unlock();
1634 }
1635 else if (e.result == tefPAST_SEQ)
1636 {
1637 // duplicate or conflict
1638 JLOG(journal_.info()) << "Transaction is obsolete";
1639 e.transaction->setStatus(TransStatus::OBSOLETE);
1640 }
1641 else if (e.result == terQUEUED)
1642 {
1643 JLOG(journal_.debug()) << "Transaction is likely to claim a"
1644 << " fee, but is queued until fee drops";
1645
1646 e.transaction->setStatus(TransStatus::HELD);
1647 // Add to held transactions, because it could get
1648 // kicked out of the queue, and this will try to
1649 // put it back.
1650 ledgerMaster_.addHeldTransaction(e.transaction);
1651 e.transaction->setQueued();
1652 e.transaction->setKept();
1653 }
1654 else if (isTerRetry(e.result) || isTelLocal(e.result) || isTefFailure(e.result))
1655 {
1656 if (e.failType != FailHard::Yes)
1657 {
1658 auto const lastLedgerSeq =
1659 e.transaction->getSTransaction()->at(~sfLastLedgerSequence);
1660 auto const ledgersLeft = lastLedgerSeq
1661 ? *lastLedgerSeq - ledgerMaster_.getCurrentLedgerIndex()
1663 // If any of these conditions are met, the transaction can
1664 // be held:
1665 // 1. It was submitted locally. (Note that this flag is only
1666 // true on the initial submission.)
1667 // 2. The transaction has a LastLedgerSequence, and the
1668 // LastLedgerSequence is fewer than LocalTxs::kHoldLedgers
1669 // (5) ledgers into the future. (Remember that an
1670 // unseated optional compares as less than all seated
1671 // values, so it has to be checked explicitly first.)
1672 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1673 // (setFlags
1674 // checks before setting. If the flag is set, it returns
1675 // false, which means it's been held once without one of
1676 // the other conditions, so don't hold it again. Time's
1677 // up!)
1678 //
1679 if (e.local || (ledgersLeft && ledgersLeft <= LocalTxs::kHoldLedgers) ||
1680 registry_.get().getHashRouter().setFlags(
1681 e.transaction->getID(), HashRouterFlags::HELD))
1682 {
1683 // transaction should be held
1684 JLOG(journal_.debug()) << "Transaction should be held: " << e.result;
1685 e.transaction->setStatus(TransStatus::HELD);
1686 ledgerMaster_.addHeldTransaction(e.transaction);
1687 e.transaction->setKept();
1688 }
1689 else
1690 {
1691 JLOG(journal_.debug())
1692 << "Not holding transaction " << e.transaction->getID() << ": "
1693 << (e.local ? "local" : "network") << ", "
1694 << "result: " << e.result << " ledgers left: "
1695 << (ledgersLeft ? to_string(*ledgersLeft) : "unspecified");
1696 }
1697 }
1698 }
1699 else
1700 {
1701 JLOG(journal_.debug()) << "Status other than success " << e.result;
1702 e.transaction->setStatus(TransStatus::INVALID);
1703 }
1704
1705 auto const enforceFailHard = e.failType == FailHard::Yes && !isTesSuccess(e.result);
1706
1707 if (addLocal && !enforceFailHard)
1708 {
1709 localTX_->pushBack(
1710 ledgerMaster_.getCurrentLedgerIndex(), e.transaction->getSTransaction());
1711 e.transaction->setKept();
1712 }
1713
1714 if ((e.applied ||
1715 ((mode_ != OperatingMode::FULL) && (e.failType != FailHard::Yes) && e.local) ||
1716 (e.result == terQUEUED)) &&
1717 !enforceFailHard)
1718 {
1719 auto const toSkip =
1720 registry_.get().getHashRouter().shouldRelay(e.transaction->getID());
1721 if (auto const sttx = *(e.transaction->getSTransaction()); toSkip &&
1722 // Skip relaying if it's an inner batch txn. The flag should
1723 // only be set if the Batch feature is enabled. If Batch is
1724 // not enabled, the flag is always invalid, so don't relay
1725 // it regardless.
1726 !(sttx.isFlag(tfInnerBatchTxn)))
1727 {
1728 protocol::TMTransaction tx;
1729 Serializer s;
1730
1731 sttx.add(s);
1732 tx.set_rawtransaction(s.data(), s.size());
1733 tx.set_status(protocol::tsCURRENT);
1734 tx.set_receivetimestamp(
1735 registry_.get().getTimeKeeper().now().time_since_epoch().count());
1736 tx.set_deferred(e.result == terQUEUED);
1737 // FIXME: This should be when we received it
1738 registry_.get().getOverlay().relay(e.transaction->getID(), tx, *toSkip);
1739 e.transaction->setBroadcast();
1740 }
1741 }
1742
1743 if (validatedLedgerIndex)
1744 {
1745 auto [fee, accountSeq, availableSeq] =
1746 registry_.get().getTxQ().getTxRequiredFeeAndSeq(
1747 *newOL, e.transaction->getSTransaction());
1748 e.transaction->setCurrentLedgerState(
1749 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1750 }
1751 }
1752 }
1753
1754 batchLock.lock();
1755
1756 for (TransactionStatus const& e : transactions)
1757 e.transaction->clearApplying();
1758
1759 if (!submitHeld.empty())
1760 {
1761 if (transactions_.empty())
1762 {
1763 transactions_.swap(submitHeld);
1764 }
1765 else
1766 {
1767 transactions_.reserve(transactions_.size() + submitHeld.size());
1768 for (auto& e : submitHeld)
1769 transactions_.push_back(std::move(e));
1770 }
1771 }
1772
1773 cond_.notify_all();
1774
1776}
1777
1778//
1779// Owner functions
1780//
1781
1784{
1786 auto root = keylet::ownerDir(account);
1787 auto sleNode = lpLedger->read(keylet::page(root));
1788 if (sleNode)
1789 {
1790 std::uint64_t uNodeDir = 0;
1791
1792 do
1793 {
1794 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1795 {
1796 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1797 XRPL_ASSERT(sleCur, "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1798
1799 switch (sleCur->getType())
1800 {
1801 case ltOFFER:
1802 if (!jvObjects.isMember(jss::offers))
1803 jvObjects[jss::offers] = json::Value(json::ValueType::Array);
1804
1805 jvObjects[jss::offers].append(sleCur->getJson(JsonOptions::Values::None));
1806 break;
1807
1808 case ltRIPPLE_STATE:
1809 if (!jvObjects.isMember(jss::ripple_lines))
1810 {
1811 jvObjects[jss::ripple_lines] = json::Value(json::ValueType::Array);
1812 }
1813
1814 jvObjects[jss::ripple_lines].append(
1815 sleCur->getJson(JsonOptions::Values::None));
1816 break;
1817
1818 case ltACCOUNT_ROOT:
1819 case ltDIR_NODE:
1820 // LCOV_EXCL_START
1821 default:
1822 UNREACHABLE(
1823 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1824 "type");
1825 break;
1826 // LCOV_EXCL_STOP
1827 }
1828 }
1829
1830 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1831
1832 if (uNodeDir != 0u)
1833 {
1834 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1835 XRPL_ASSERT(sleNode, "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1836 }
1837 } while (uNodeDir != 0u);
1838 }
1839
1840 return jvObjects;
1841}
1842
1843//
1844// Other
1845//
1846
1847inline bool
1852
1853inline bool
1858
1859void
1865
1866inline bool
1871
1872inline void
1877
1878inline void
1883
1884inline bool
1886{
1887 return unlBlocked_;
1888}
1889
1890void
1896
1897inline void
1902
1903bool
1905{
1906 // Returns true if there's an *abnormal* ledger issue, normal changing in
1907 // TRACKING mode should return false. Do we have sufficient validations for
1908 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1909 // better ledger available? If so, we are either tracking or full.
1910
1911 JLOG(journal_.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1912
1913 auto const ourClosed = ledgerMaster_.getClosedLedger();
1914
1915 if (!ourClosed)
1916 return false;
1917
1918 uint256 closedLedger = ourClosed->header().hash;
1919 uint256 const prevClosedLedger = ourClosed->header().parentHash;
1920 JLOG(journal_.trace()) << "OurClosed: " << closedLedger;
1921 JLOG(journal_.trace()) << "PrevClosed: " << prevClosedLedger;
1922
1923 //-------------------------------------------------------------------------
1924 // Determine preferred last closed ledger
1925
1926 auto& validations = registry_.get().getValidations();
1927 JLOG(journal_.debug()) << "ValidationTrie " << json::Compact(validations.getJsonTrie());
1928
1929 // Will rely on peer LCL if no trusted validations exist
1931 peerCounts[closedLedger] = 0;
1933 peerCounts[closedLedger]++;
1934
1935 for (auto& peer : peerList)
1936 {
1937 uint256 const peerLedger = peer->getClosedLedgerHash();
1938
1939 if (peerLedger.isNonZero())
1940 ++peerCounts[peerLedger];
1941 }
1942
1943 for (auto const& it : peerCounts)
1944 JLOG(journal_.debug()) << "L: " << it.first << " n=" << it.second;
1945
1946 uint256 const preferredLCL = validations.getPreferredLCL(
1947 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1948 ledgerMaster_.getValidLedgerIndex(),
1949 peerCounts);
1950
1951 bool switchLedgers = preferredLCL != closedLedger;
1952 if (switchLedgers)
1953 closedLedger = preferredLCL;
1954 //-------------------------------------------------------------------------
1955 if (switchLedgers && (closedLedger == prevClosedLedger))
1956 {
1957 // don't switch to our own previous ledger
1958 JLOG(journal_.info()) << "We won't switch to our own previous ledger";
1959 networkClosed = ourClosed->header().hash;
1960 switchLedgers = false;
1961 }
1962 else
1963 {
1964 networkClosed = closedLedger;
1965 }
1966
1967 if (!switchLedgers)
1968 return false;
1969
1970 auto consensus = ledgerMaster_.getLedgerByHash(closedLedger);
1971
1972 if (!consensus)
1973 {
1974 consensus = registry_.get().getInboundLedgers().acquire(
1975 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1976 }
1977
1978 if (consensus &&
1979 (!ledgerMaster_.canBeCurrent(consensus) ||
1980 !ledgerMaster_.isCompatible(*consensus, journal_.debug(), "Not switching")))
1981 {
1982 // Don't switch to a ledger not on the validated chain
1983 // or with an invalid close time or sequence
1984 networkClosed = ourClosed->header().hash;
1985 return false;
1986 }
1987
1988 JLOG(journal_.warn()) << "We are not running on the consensus ledger";
1989 JLOG(journal_.info()) << "Our LCL: " << ourClosed->header().hash << getJson({*ourClosed, {}});
1990 JLOG(journal_.info()) << "Net LCL " << closedLedger;
1991
1993 {
1995 }
1996
1997 if (consensus)
1998 {
1999 // FIXME: If this rewinds the ledger sequence, or has the same
2000 // sequence, we should update the status on any stored transactions
2001 // in the invalidated ledgers.
2002 switchLastClosedLedger(consensus);
2003 }
2004
2005 return true;
2006}
2007
2008void
2010{
2011 // set the newLCL as our last closed ledger -- this is abnormal code
2012 JLOG(journal_.error()) << "JUMP last closed ledger to " << newLCL->header().hash;
2013
2015
2016 // Update fee computations.
2017 registry_.get().getTxQ().processClosedLedger(registry_.get().getApp(), *newLCL, true);
2018
2019 // Caller must own master lock
2020 {
2021 // Apply tx in old open ledger to new
2022 // open ledger. Then apply local tx.
2023
2024 auto retries = localTX_->getTxSet();
2025 auto const lastVal = registry_.get().getLedgerMaster().getValidatedLedger();
2027 if (lastVal)
2028 {
2029 rules = makeRulesGivenLedger(*lastVal, registry_.get().getApp().config().features);
2030 }
2031 else
2032 {
2033 rules.emplace(registry_.get().getApp().config().features);
2034 }
2035 registry_.get().getOpenLedger().accept(
2036 registry_.get().getApp(),
2037 *rules,
2038 newLCL,
2039 OrderedTxs({}),
2040 false,
2041 retries,
2042 TapNone,
2043 "jump",
2044 [&](OpenView& view, beast::Journal j) {
2045 // Stuff the ledger with transactions from the queue.
2046 return registry_.get().getTxQ().accept(registry_.get().getApp(), view);
2047 });
2048 }
2049
2050 ledgerMaster_.switchLCL(newLCL);
2051
2052 protocol::TMStatusChange s;
2053 s.set_newevent(protocol::neSWITCHED_LEDGER);
2054 s.set_ledgerseq(newLCL->header().seq);
2055 s.set_networktime(registry_.get().getTimeKeeper().now().time_since_epoch().count());
2056 s.set_ledgerhashprevious(
2057 newLCL->header().parentHash.begin(), newLCL->header().parentHash.size());
2058 s.set_ledgerhash(newLCL->header().hash.begin(), newLCL->header().hash.size());
2059 registry_.get().getOverlay().foreach(
2060 SendAlways(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
2061}
2062
2063bool
2065 uint256 const& networkClosed,
2067{
2068 XRPL_ASSERT(networkClosed.isNonZero(), "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
2069
2070 auto closingInfo = ledgerMaster_.getCurrentLedger()->header();
2071
2072 JLOG(journal_.info()) << "Consensus time for #" << closingInfo.seq << " with LCL "
2073 << closingInfo.parentHash;
2074
2075 auto prevLedger = ledgerMaster_.getLedgerByHash(closingInfo.parentHash);
2076
2077 if (!prevLedger)
2078 {
2079 // this shouldn't happen unless we jump ledgers
2081 {
2082 JLOG(journal_.warn()) << "Don't have LCL, going to tracking";
2084 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
2085 }
2086
2087 CLOG(clog) << "beginConsensus no previous ledger. ";
2088 return false;
2089 }
2090
2091 XRPL_ASSERT(
2092 prevLedger->header().hash == closingInfo.parentHash,
2093 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2094 "parent");
2095 XRPL_ASSERT(
2096 closingInfo.parentHash == ledgerMaster_.getClosedLedger()->header().hash,
2097 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2098 "hash");
2099
2100 registry_.get().getValidators().setNegativeUNL(prevLedger->negativeUNL());
2101 TrustChanges const changes = registry_.get().getValidators().updateTrusted(
2102 registry_.get().getValidations().getCurrentNodeIDs(),
2103 closingInfo.parentCloseTime,
2104 *this,
2105 registry_.get().getOverlay(),
2106 registry_.get().getHashRouter());
2107
2108 if (!changes.added.empty() || !changes.removed.empty())
2109 {
2110 registry_.get().getValidations().trustChanged(changes.added, changes.removed);
2111 // Update the AmendmentTable so it tracks the current validators.
2112 registry_.get().getAmendmentTable().trustChanged(
2113 registry_.get().getValidators().getQuorumKeys().second);
2114 }
2115
2116 consensus_.startRound(
2117 registry_.get().getTimeKeeper().closeTime(),
2118 networkClosed,
2119 prevLedger,
2120 changes.removed,
2121 changes.added,
2122 clog);
2123
2124 ConsensusPhase const currPhase = consensus_.phase();
2125 if (lastConsensusPhase_ != currPhase)
2126 {
2127 reportConsensusStateChange(currPhase);
2128 lastConsensusPhase_ = currPhase;
2129 }
2130
2131 JLOG(journal_.debug()) << "Initiating consensus engine";
2132 return true;
2133}
2134
2135bool
2137{
2138 auto const& peerKey = peerPos.publicKey();
2139 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2140 {
2141 // Could indicate a operator misconfiguration where two nodes are
2142 // running with the same validator key configured, so this isn't fatal,
2143 // and it doesn't necessarily indicate peer misbehavior. But since this
2144 // is a trusted message, it could be a very big deal. Either way, we
2145 // don't want to relay the proposal. Note that the byzantine behavior
2146 // detection in handleNewValidation will notify other peers.
2147 //
2148 // Another, innocuous explanation is unusual message routing and delays,
2149 // causing this node to receive its own messages back.
2150 JLOG(journal_.error()) << "Received a proposal signed by MY KEY from a peer. This may "
2151 "indicate a misconfiguration where another node has the same "
2152 "validator key, or may be caused by unusual message routing and "
2153 "delays.";
2154 return false;
2155 }
2156
2157 return consensus_.peerProposal(registry_.get().getTimeKeeper().closeTime(), peerPos);
2158}
2159
2160void
2162{
2163 // We now have an additional transaction set
2164 // Inform peers we have this set
2165 protocol::TMHaveTransactionSet msg;
2166 msg.set_hash(map->getHash().asUInt256().begin(), 256 / 8);
2167 msg.set_status(protocol::tsHAVE);
2168 registry_.get().getOverlay().foreach(
2169 SendAlways(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
2170
2171 // We acquired it because consensus asked us to
2172 if (fromAcquire)
2173 consensus_.gotTxSet(registry_.get().getTimeKeeper().closeTime(), RCLTxSet{map});
2174}
2175
2176void
2178{
2179 uint256 const deadLedger = ledgerMaster_.getClosedLedger()->header().parentHash;
2180 for (auto const& it : registry_.get().getOverlay().getActivePeers())
2181 {
2182 if (it && (it->getClosedLedgerHash() == deadLedger))
2183 {
2184 JLOG(journal_.trace()) << "Killing obsolete peer status";
2185 it->cycleStatus();
2186 }
2187 }
2188
2189 uint256 networkClosed;
2190 bool const ledgerChange =
2191 checkLastClosedLedger(registry_.get().getOverlay().getActivePeers(), networkClosed);
2192
2193 if (networkClosed.isZero())
2194 {
2195 CLOG(clog) << "endConsensus last closed ledger is zero. ";
2196 return;
2197 }
2198
2199 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
2200 // we must count how many nodes share our LCL, how many nodes disagree with
2201 // our LCL, and how many validations our LCL has. We also want to check
2202 // timing to make sure there shouldn't be a newer LCL. We need this
2203 // information to do the next three tests.
2204
2205 if (((mode_ == OperatingMode::CONNECTED) || (mode_ == OperatingMode::SYNCING)) && !ledgerChange)
2206 {
2207 // Count number of peers that agree with us and UNL nodes whose
2208 // validations we have for LCL. If the ledger is good enough, go to
2209 // TRACKING - TODO
2210 if (!needNetworkLedger_)
2212 }
2213
2215 !ledgerChange)
2216 {
2217 // check if the ledger is good enough to go to FULL
2218 // Note: Do not go to FULL if we don't have the previous ledger
2219 // check if the ledger is bad enough to go to CONNECTED -- TODO
2220 auto current = ledgerMaster_.getCurrentLedger();
2221 if (registry_.get().getTimeKeeper().now() <
2222 (current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
2223 {
2225 }
2226 }
2227
2228 beginConsensus(networkClosed, clog);
2229}
2230
2231void
2239
2240void
2242{
2243 // VFALCO consider std::shared_mutex
2244 std::scoped_lock const sl(subLock_);
2245
2246 if (!streamMaps_[SManifests].empty())
2247 {
2249
2250 jvObj[jss::type] = "manifestReceived";
2251 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2252 if (mo.signingKey)
2253 jvObj[jss::signing_key] = toBase58(TokenType::NodePublic, *mo.signingKey);
2254 jvObj[jss::seq] = json::UInt(mo.sequence);
2255 if (auto sig = mo.getSignature())
2256 jvObj[jss::signature] = strHex(*sig);
2257 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2258 if (!mo.domain.empty())
2259 jvObj[jss::domain] = mo.domain;
2260 jvObj[jss::manifest] = strHex(mo.serialized);
2261
2262 for (auto i = streamMaps_[SManifests].begin(); i != streamMaps_[SManifests].end();)
2263 {
2264 if (auto p = i->second.lock())
2265 {
2266 p->send(jvObj, true);
2267 ++i;
2268 }
2269 else
2270 {
2271 i = streamMaps_[SManifests].erase(i);
2272 }
2273 }
2274 }
2275}
2276
2278 XRPAmount fee,
2279 TxQ::Metrics escalationMetrics, // trivially copyable
2280 LoadFeeTrack const& loadFeeTrack)
2281 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2282 , loadBaseServer{loadFeeTrack.getLoadBase()}
2283 , baseFee{fee}
2284 , em{escalationMetrics}
2285{
2286}
2287
2288bool
2290{
2292 baseFee != b.baseFee || em.has_value() != b.em.has_value())
2293 return true;
2294
2295 if (em && b.em)
2296 {
2297 return (
2298 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2299 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel ||
2300 em->referenceFeeLevel != b.em->referenceFeeLevel);
2301 }
2302
2303 return false;
2304}
2305
2306// Need to cap to uint64 to uint32 due to JSON limitations
2307static std::uint32_t
2309{
2311
2312 return std::min(kMax32, v);
2313};
2314
2315void
2317{
2318 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2319 // list into a local array while holding the lock then release
2320 // the lock and call send on everyone.
2321 //
2322 std::scoped_lock const sl(subLock_);
2323
2324 if (!streamMaps_[SServer].empty())
2325 {
2327
2329 registry_.get().getOpenLedger().current()->fees().base,
2330 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current()),
2331 registry_.get().getFeeTrack()};
2332
2333 jvObj[jss::type] = "serverStatus";
2334 jvObj[jss::server_status] = strOperatingMode();
2335 jvObj[jss::load_base] = f.loadBaseServer;
2336 jvObj[jss::load_factor_server] = f.loadFactorServer;
2337 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2338
2339 if (f.em)
2340 {
2341 auto const loadFactor = std::max(
2343 mulDiv(f.em->openLedgerFeeLevel, f.loadBaseServer, f.em->referenceFeeLevel)
2344 .value_or(xrpl::kMuldivMax));
2345
2346 jvObj[jss::load_factor] = trunc32(loadFactor);
2347 jvObj[jss::load_factor_fee_escalation] = f.em->openLedgerFeeLevel.jsonClipped();
2348 jvObj[jss::load_factor_fee_queue] = f.em->minProcessingFeeLevel.jsonClipped();
2349 jvObj[jss::load_factor_fee_reference] = f.em->referenceFeeLevel.jsonClipped();
2350 }
2351 else
2352 {
2353 jvObj[jss::load_factor] = f.loadFactorServer;
2354 }
2355
2356 lastFeeSummary_ = f;
2357
2358 for (auto i = streamMaps_[SServer].begin(); i != streamMaps_[SServer].end();)
2359 {
2360 InfoSub::pointer const p = i->second.lock();
2361
2362 // VFALCO TODO research the possibility of using thread queues and
2363 // linearizing the deletion of subscribers with the
2364 // sending of JSON data.
2365 if (p)
2366 {
2367 p->send(jvObj, true);
2368 ++i;
2369 }
2370 else
2371 {
2372 i = streamMaps_[SServer].erase(i);
2373 }
2374 }
2375 }
2376}
2377
2378void
2380{
2381 std::scoped_lock const sl(subLock_);
2382
2383 auto& streamMap = streamMaps_[SConsensusPhase];
2384 if (!streamMap.empty())
2385 {
2387 jvObj[jss::type] = "consensusPhase";
2388 jvObj[jss::consensus] = to_string(phase);
2389
2390 for (auto i = streamMap.begin(); i != streamMap.end();)
2391 {
2392 if (auto p = i->second.lock())
2393 {
2394 p->send(jvObj, true);
2395 ++i;
2396 }
2397 else
2398 {
2399 i = streamMap.erase(i);
2400 }
2401 }
2402 }
2403}
2404
2405void
2407{
2408 // VFALCO consider std::shared_mutex
2409 std::scoped_lock const sl(subLock_);
2410
2411 if (!streamMaps_[SValidations].empty())
2412 {
2414
2415 auto const signerPublic = val->getSignerPublic();
2416
2417 jvObj[jss::type] = "validationReceived";
2418 jvObj[jss::validation_public_key] = toBase58(TokenType::NodePublic, signerPublic);
2419 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2420 jvObj[jss::signature] = strHex(val->getSignature());
2421 jvObj[jss::full] = val->isFull();
2422 jvObj[jss::flags] = val->getFlags();
2423 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2424 jvObj[jss::data] = strHex(val->getSerializer().slice());
2425 jvObj[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
2426
2427 if (auto version = (*val)[~sfServerVersion])
2428 jvObj[jss::server_version] = std::to_string(*version);
2429
2430 if (auto cookie = (*val)[~sfCookie])
2431 jvObj[jss::cookie] = std::to_string(*cookie);
2432
2433 if (auto hash = (*val)[~sfValidatedHash])
2434 jvObj[jss::validated_hash] = strHex(*hash);
2435
2436 auto const masterKey = registry_.get().getValidatorManifests().getMasterKey(signerPublic);
2437
2438 if (masterKey != signerPublic)
2439 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2440
2441 // NOTE *seq is a number, but old API versions used string. We replace
2442 // number with a string using MultiApiJson near end of this function
2443 if (auto const seq = (*val)[~sfLedgerSequence])
2444 jvObj[jss::ledger_index] = *seq;
2445
2446 if (val->isFieldPresent(sfAmendments))
2447 {
2448 jvObj[jss::amendments] = json::Value(json::ValueType::Array);
2449 for (auto const& amendment : val->getFieldV256(sfAmendments))
2450 jvObj[jss::amendments].append(to_string(amendment));
2451 }
2452
2453 if (auto const closeTime = (*val)[~sfCloseTime])
2454 jvObj[jss::close_time] = *closeTime;
2455
2456 if (auto const loadFee = (*val)[~sfLoadFee])
2457 jvObj[jss::load_fee] = *loadFee;
2458
2459 if (auto const baseFee = val->at(~sfBaseFee))
2460 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2461
2462 if (auto const reserveBase = val->at(~sfReserveBase))
2463 jvObj[jss::reserve_base] = *reserveBase;
2464
2465 if (auto const reserveInc = val->at(~sfReserveIncrement))
2466 jvObj[jss::reserve_inc] = *reserveInc;
2467
2468 // (The ~ operator converts the Proxy to a std::optional, which
2469 // simplifies later operations)
2470 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops); baseFeeXRP && baseFeeXRP->native())
2471 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2472
2473 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2474 reserveBaseXRP && reserveBaseXRP->native())
2475 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2476
2477 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2478 reserveIncXRP && reserveIncXRP->native())
2479 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2480
2481 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2482 // for consumers supporting different API versions
2483 MultiApiJson multiObj{jvObj};
2484 multiObj.visit(
2486 [](json::Value& jvTx) {
2487 // Type conversion for older API versions to string
2488 if (jvTx.isMember(jss::ledger_index))
2489 {
2490 jvTx[jss::ledger_index] = std::to_string(jvTx[jss::ledger_index].asUInt());
2491 }
2492 });
2493
2494 for (auto i = streamMaps_[SValidations].begin(); i != streamMaps_[SValidations].end();)
2495 {
2496 if (auto p = i->second.lock())
2497 {
2498 multiObj.visit(
2499 p->getApiVersion(), //
2500 [&](json::Value const& jv) { p->send(jv, true); });
2501 ++i;
2502 }
2503 else
2504 {
2505 i = streamMaps_[SValidations].erase(i);
2506 }
2507 }
2508 }
2509}
2510
2511void
2513{
2514 std::scoped_lock const sl(subLock_);
2515
2516 if (!streamMaps_[SPeerStatus].empty())
2517 {
2518 json::Value jvObj(func());
2519
2520 jvObj[jss::type] = "peerStatusChange";
2521
2522 for (auto i = streamMaps_[SPeerStatus].begin(); i != streamMaps_[SPeerStatus].end();)
2523 {
2524 InfoSub::pointer const p = i->second.lock();
2525
2526 if (p)
2527 {
2528 p->send(jvObj, true);
2529 ++i;
2530 }
2531 else
2532 {
2533 i = streamMaps_[SPeerStatus].erase(i);
2534 }
2535 }
2536 }
2537}
2538
2539void
2541{
2542 using namespace std::chrono_literals;
2543 if (om == OperatingMode::CONNECTED)
2544 {
2545 if (registry_.get().getLedgerMaster().getValidatedLedgerAge() < 1min)
2547 }
2548 else if (om == OperatingMode::SYNCING)
2549 {
2550 if (registry_.get().getLedgerMaster().getValidatedLedgerAge() >= 1min)
2552 }
2553
2554 if ((om > OperatingMode::CONNECTED) && isBlocked())
2556
2557 if (mode_ == om)
2558 return;
2559
2560 mode_ = om;
2561
2562 accounting_.mode(om);
2563
2564 JLOG(journal_.info()) << "STATE->" << strOperatingMode();
2565 pubServer();
2566}
2567
2568bool
2570{
2571 JLOG(journal_.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
2572
2574 BypassAccept bypassAccept = BypassAccept::No;
2575 try
2576 {
2577 if (pendingValidations_.contains(val->getLedgerHash()))
2578 {
2579 bypassAccept = BypassAccept::Yes;
2580 }
2581 else
2582 {
2583 pendingValidations_.insert(val->getLedgerHash());
2584 }
2585 ScopeUnlock const unlock(lock);
2586 handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, journal_);
2587 }
2588 catch (std::exception const& e)
2589 {
2590 JLOG(journal_.warn()) << "Exception thrown for handling new validation "
2591 << val->getLedgerHash() << ": " << e.what();
2592 }
2593 catch (...)
2594 {
2595 JLOG(journal_.warn()) << "Unknown exception thrown for handling new validation "
2596 << val->getLedgerHash();
2597 }
2598 if (bypassAccept == BypassAccept::No)
2599 {
2600 pendingValidations_.erase(val->getLedgerHash());
2601 }
2602 lock.unlock();
2603
2604 pubValidation(val);
2605
2606 JLOG(journal_.debug()) << [this, &val]() -> auto {
2608 ss << "VALIDATION: " << val->render() << " master_key: ";
2609 auto master = registry_.get().getValidators().getTrustedKey(val->getSignerPublic());
2610 if (master)
2611 {
2612 ss << toBase58(TokenType::NodePublic, *master);
2613 }
2614 else
2615 {
2616 ss << "none";
2617 }
2618 return ss.str();
2619 }();
2620
2621 // We will always relay trusted validations; if configured, we will
2622 // also relay all untrusted validations.
2623 return registry_.get().getApp().config().relayUntrustedValidations == 1 || val->isTrusted();
2624}
2625
2628{
2629 return consensus_.getJson(true);
2630}
2631
2633NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2634{
2636
2637 // System-level warnings
2638 {
2640 if (isAmendmentBlocked())
2641 {
2643 w[jss::id] = WarnRpcAmendmentBlocked;
2644 w[jss::message] =
2645 "This server is amendment blocked, and must be updated to be "
2646 "able to stay in sync with the network.";
2647 }
2648 if (isUNLBlocked())
2649 {
2651 w[jss::id] = WarnRpcExpiredValidatorList;
2652 w[jss::message] =
2653 "This server has an expired validator list. validators.txt "
2654 "may be incorrectly configured or some [validator_list_sites] "
2655 "may be unreachable.";
2656 }
2657 if (admin && isAmendmentWarned())
2658 {
2660 w[jss::id] = WarnRpcUnsupportedMajority;
2661 w[jss::message] =
2662 "One or more unsupported amendments have reached majority. "
2663 "Upgrade to the latest version before they are activated "
2664 "to avoid being amendment blocked.";
2665 if (auto const expected =
2666 registry_.get().getAmendmentTable().firstUnsupportedExpected())
2667 {
2668 auto& d = w[jss::details] = json::ValueType::Object;
2669 d[jss::expected_date] = expected->time_since_epoch().count();
2670 d[jss::expected_date_UTC] = to_string(*expected);
2671 }
2672 }
2673
2674 if (warnings.size() != 0u)
2675 info[jss::warnings] = std::move(warnings);
2676 }
2677
2678 // hostid: unique string describing the machine
2679 if (human)
2680 info[jss::hostid] = getHostId(admin);
2681
2682 // domain: if configured with a domain, report it:
2683 if (!registry_.get().getApp().config().serverDomain.empty())
2684 info[jss::server_domain] = registry_.get().getApp().config().serverDomain;
2685
2686 info[jss::build_version] = BuildInfo::getVersionString();
2687
2688 info[jss::server_state] = strOperatingMode(admin);
2689
2690 info[jss::time] =
2691 to_string(std::chrono::floor<std::chrono::microseconds>(std::chrono::system_clock::now()));
2692
2694 info[jss::network_ledger] = "waiting";
2695
2696 info[jss::validation_quorum] =
2697 static_cast<json::UInt>(registry_.get().getValidators().quorum());
2698
2699 if (admin)
2700 {
2701 // Note: By default the node size is "tiny". When parsing it's an error if the final
2702 // NODE_SIZE is over 4 so below code should be safe.
2703 // NOLINTNEXTLINE(bugprone-switch-missing-default-case)
2704 switch (registry_.get().getApp().config().nodeSize)
2705 {
2706 case 0:
2707 info[jss::node_size] = "tiny";
2708 break;
2709 case 1:
2710 info[jss::node_size] = "small";
2711 break;
2712 case 2:
2713 info[jss::node_size] = "medium";
2714 break;
2715 case 3:
2716 info[jss::node_size] = "large";
2717 break;
2718 case 4:
2719 info[jss::node_size] = "huge";
2720 break;
2721 }
2722
2723 auto when = registry_.get().getValidators().expires();
2724
2725 if (!human)
2726 {
2727 if (when)
2728 {
2729 info[jss::validator_list_expires] =
2730 safeCast<json::UInt>(when->time_since_epoch().count());
2731 }
2732 else
2733 {
2734 info[jss::validator_list_expires] = 0;
2735 }
2736 }
2737 else
2738 {
2739 auto& x = (info[jss::validator_list] = json::ValueType::Object);
2740
2741 x[jss::count] = static_cast<json::UInt>(registry_.get().getValidators().count());
2742
2743 if (when)
2744 {
2745 if (*when == TimeKeeper::time_point::max())
2746 {
2747 x[jss::expiration] = "never";
2748 x[jss::status] = "active";
2749 }
2750 else
2751 {
2752 x[jss::expiration] = to_string(*when);
2753
2754 if (*when > registry_.get().getTimeKeeper().now())
2755 {
2756 x[jss::status] = "active";
2757 }
2758 else
2759 {
2760 x[jss::status] = "expired";
2761 }
2762 }
2763 }
2764 else
2765 {
2766 x[jss::status] = "unknown";
2767 x[jss::expiration] = "unknown";
2768 }
2769 }
2770
2771 if (!xrpl::git::getCommitHash().empty() || !xrpl::git::getBuildBranch().empty())
2772 {
2773 auto& x = (info[jss::git] = json::ValueType::Object);
2774 if (!xrpl::git::getCommitHash().empty())
2775 x[jss::hash] = xrpl::git::getCommitHash();
2776 if (!xrpl::git::getBuildBranch().empty())
2777 x[jss::branch] = xrpl::git::getBuildBranch();
2778 }
2779 }
2780 info[jss::io_latency_ms] =
2781 static_cast<json::UInt>(registry_.get().getApp().getIOLatency().count());
2782
2783 if (admin)
2784 {
2785 if (auto const localPubKey = registry_.get().getValidators().localPublicKey();
2786 localPubKey && registry_.get().getApp().getValidationPublicKey())
2787 {
2788 info[jss::pubkey_validator] = toBase58(TokenType::NodePublic, localPubKey.value());
2789 }
2790 else
2791 {
2792 info[jss::pubkey_validator] = "none";
2793 }
2794 }
2795
2796 if (counters)
2797 {
2798 info[jss::counters] = registry_.get().getPerfLog().countersJson();
2799
2801 registry_.get().getNodeStore().getCountsJson(nodestore);
2802 info[jss::counters][jss::nodestore] = nodestore;
2803 info[jss::current_activities] = registry_.get().getPerfLog().currentJson();
2804 }
2805
2806 info[jss::pubkey_node] =
2807 toBase58(TokenType::NodePublic, registry_.get().getApp().nodeIdentity().first);
2808
2809 info[jss::complete_ledgers] = registry_.get().getLedgerMaster().getCompleteLedgers();
2810
2812 info[jss::amendment_blocked] = true;
2813
2814 auto const fp = ledgerMaster_.getFetchPackCacheSize();
2815
2816 if (fp != 0)
2817 info[jss::fetch_pack] = json::UInt(fp);
2818
2819 info[jss::peers] = json::UInt(registry_.get().getOverlay().size());
2820
2822 lastClose[jss::proposers] = json::UInt(consensus_.prevProposers());
2823
2824 if (human)
2825 {
2826 lastClose[jss::converge_time_s] =
2828 }
2829 else
2830 {
2831 lastClose[jss::converge_time] = json::Int(consensus_.prevRoundTime().count());
2832 }
2833
2834 info[jss::last_close] = lastClose;
2835
2836 // info[jss::consensus] = consensus_.getJson();
2837
2838 if (admin)
2839 info[jss::load] = jobQueue_.getJson();
2840
2841 if (auto const netid = registry_.get().getOverlay().networkID())
2842 info[jss::network_id] = static_cast<json::UInt>(*netid);
2843
2844 auto const escalationMetrics =
2845 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current());
2846
2847 auto const loadFactorServer = registry_.get().getFeeTrack().getLoadFactor();
2848 auto const loadBaseServer = registry_.get().getFeeTrack().getLoadBase();
2849 /* Scale the escalated fee level to unitless "load factor".
2850 In practice, this just strips the units, but it will continue
2851 to work correctly if either base value ever changes. */
2852 auto const loadFactorFeeEscalation = mulDiv(
2853 escalationMetrics.openLedgerFeeLevel,
2854 loadBaseServer,
2855 escalationMetrics.referenceFeeLevel)
2857
2858 auto const loadFactor =
2859 std::max(safeCast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2860
2861 if (!human)
2862 {
2863 info[jss::load_base] = loadBaseServer;
2864 info[jss::load_factor] = trunc32(loadFactor);
2865 info[jss::load_factor_server] = loadFactorServer;
2866
2867 /* json::Value doesn't support uint64, so clamp to max
2868 uint32 value. This is mostly theoretical, since there
2869 probably isn't enough extant XRP to drive the factor
2870 that high.
2871 */
2872 info[jss::load_factor_fee_escalation] = escalationMetrics.openLedgerFeeLevel.jsonClipped();
2873 info[jss::load_factor_fee_queue] = escalationMetrics.minProcessingFeeLevel.jsonClipped();
2874 info[jss::load_factor_fee_reference] = escalationMetrics.referenceFeeLevel.jsonClipped();
2875 }
2876 else
2877 {
2878 info[jss::load_factor] = static_cast<double>(loadFactor) / loadBaseServer;
2879
2880 if (loadFactorServer != loadFactor)
2881 info[jss::load_factor_server] = static_cast<double>(loadFactorServer) / loadBaseServer;
2882
2883 if (admin)
2884 {
2885 std::uint32_t fee = registry_.get().getFeeTrack().getLocalFee();
2886 if (fee != loadBaseServer)
2887 info[jss::load_factor_local] = static_cast<double>(fee) / loadBaseServer;
2888 fee = registry_.get().getFeeTrack().getRemoteFee();
2889 if (fee != loadBaseServer)
2890 info[jss::load_factor_net] = static_cast<double>(fee) / loadBaseServer;
2891 fee = registry_.get().getFeeTrack().getClusterFee();
2892 if (fee != loadBaseServer)
2893 info[jss::load_factor_cluster] = static_cast<double>(fee) / loadBaseServer;
2894 }
2895 if (escalationMetrics.openLedgerFeeLevel != escalationMetrics.referenceFeeLevel &&
2896 (admin || loadFactorFeeEscalation != loadFactor))
2897 {
2898 info[jss::load_factor_fee_escalation] =
2899 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2900 escalationMetrics.referenceFeeLevel);
2901 }
2902 if (escalationMetrics.minProcessingFeeLevel != escalationMetrics.referenceFeeLevel)
2903 {
2904 info[jss::load_factor_fee_queue] =
2905 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2906 escalationMetrics.referenceFeeLevel);
2907 }
2908 }
2909
2910 bool valid = false;
2911 auto lpClosed = ledgerMaster_.getValidatedLedger();
2912
2913 if (lpClosed)
2914 {
2915 valid = true;
2916 }
2917 else
2918 {
2919 lpClosed = ledgerMaster_.getClosedLedger();
2920 }
2921
2922 if (lpClosed)
2923 {
2924 XRPAmount const baseFee = lpClosed->fees().base;
2926 l[jss::seq] = json::UInt(lpClosed->header().seq);
2927 l[jss::hash] = to_string(lpClosed->header().hash);
2928
2929 if (!human)
2930 {
2931 l[jss::base_fee] = baseFee.jsonClipped();
2932 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2933 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2934 l[jss::close_time] =
2935 json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
2936 }
2937 else
2938 {
2939 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2940 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2941 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2942
2943 if (auto const closeOffset = registry_.get().getTimeKeeper().closeOffset();
2944 std::abs(closeOffset.count()) >= 60)
2945 l[jss::close_time_offset] = static_cast<std::uint32_t>(closeOffset.count());
2946
2947 static constexpr std::chrono::seconds kHighAgeThreshold{1000000};
2948 if (ledgerMaster_.haveValidated())
2949 {
2950 auto const age = ledgerMaster_.getValidatedLedgerAge();
2951 l[jss::age] = json::UInt(age < kHighAgeThreshold ? age.count() : 0);
2952 }
2953 else
2954 {
2955 auto lCloseTime = lpClosed->header().closeTime;
2956 auto closeTime = registry_.get().getTimeKeeper().closeTime();
2957 if (lCloseTime <= closeTime)
2958 {
2959 using namespace std::chrono_literals;
2960 auto age = closeTime - lCloseTime;
2961 l[jss::age] = json::UInt(age < kHighAgeThreshold ? age.count() : 0);
2962 }
2963 }
2964 }
2965
2966 if (valid)
2967 {
2968 info[jss::validated_ledger] = l;
2969 }
2970 else
2971 {
2972 info[jss::closed_ledger] = l;
2973 }
2974
2975 auto lpPublished = ledgerMaster_.getPublishedLedger();
2976 if (!lpPublished)
2977 {
2978 info[jss::published_ledger] = "none";
2979 }
2980 else if (lpPublished->header().seq != lpClosed->header().seq)
2981 {
2982 info[jss::published_ledger] = lpPublished->header().seq;
2983 }
2984 }
2985
2986 accounting_.json(info);
2987 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2988 info[jss::jq_trans_overflow] =
2989 std::to_string(registry_.get().getOverlay().getJqTransOverflow());
2990 info[jss::peer_disconnects] = std::to_string(registry_.get().getOverlay().getPeerDisconnect());
2991 info[jss::peer_disconnects_resources] =
2992 std::to_string(registry_.get().getOverlay().getPeerDisconnectCharges());
2993
2994 // This array must be sorted in increasing order.
2995 static constexpr std::array<std::string_view, 7> kProtocols{
2996 "http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2997 static_assert(std::ranges::is_sorted(kProtocols));
2998 {
3000 for (auto const& port : registry_.get().getServerHandler().setup().ports)
3001 {
3002 // Don't publish admin ports for non-admin users
3003 if (!admin &&
3004 !(port.adminNetsV4.empty() && port.adminNetsV6.empty() && port.adminUser.empty() &&
3005 port.adminPassword.empty()))
3006 continue;
3008 // NOLINTNEXTLINE(modernize-use-ranges)
3010 std::begin(port.protocol),
3011 std::end(port.protocol),
3012 std::begin(kProtocols),
3013 std::end(kProtocols),
3014 std::back_inserter(proto));
3015 if (!proto.empty())
3016 {
3017 auto& jv = ports.append(json::Value(json::ValueType::Object));
3018 jv[jss::port] = std::to_string(port.port);
3019 jv[jss::protocol] = json::Value{json::ValueType::Array};
3020 for (auto const& p : proto)
3021 jv[jss::protocol].append(p);
3022 }
3023 }
3024
3025 if (registry_.get().getApp().config().exists(Sections::kPortGrpc))
3026 {
3027 auto const& grpcSection =
3028 registry_.get().getApp().config().section(Sections::kPortGrpc);
3029 auto const optPort = grpcSection.get(Keys::kPort);
3030 if (optPort && grpcSection.get(Keys::kIp))
3031 {
3032 auto& jv = ports.append(json::Value(json::ValueType::Object));
3033 jv[jss::port] = *optPort;
3034 jv[jss::protocol] = json::Value{json::ValueType::Array};
3035 jv[jss::protocol].append("grpc");
3036 }
3037 }
3038 info[jss::ports] = std::move(ports);
3039 }
3040
3041 return info;
3042}
3043
3044void
3046{
3047 registry_.get().getInboundLedgers().clearFailures();
3048}
3049
3052{
3053 return registry_.get().getInboundLedgers().getInfo();
3054}
3055
3056void
3058 std::shared_ptr<ReadView const> const& ledger,
3059 std::shared_ptr<STTx const> const& transaction,
3060 TER result)
3061{
3062 // never publish an inner txn inside a batch txn. The flag should
3063 // only be set if the Batch feature is enabled. If Batch is not
3064 // enabled, the flag is always invalid, so don't publish it
3065 // regardless.
3066 if (transaction->isFlag(tfInnerBatchTxn))
3067 return;
3068
3069 MultiApiJson const jvObj = transJson(transaction, result, false, ledger, std::nullopt);
3070
3071 {
3072 std::scoped_lock const sl(subLock_);
3073
3074 auto it = streamMaps_[SRtTransactions].begin();
3075 while (it != streamMaps_[SRtTransactions].end())
3076 {
3077 InfoSub::pointer p = it->second.lock();
3078
3079 if (p)
3080 {
3081 jvObj.visit(
3082 p->getApiVersion(), //
3083 [&](json::Value const& jv) { p->send(jv, true); });
3084 ++it;
3085 }
3086 else
3087 {
3088 it = streamMaps_[SRtTransactions].erase(it);
3089 }
3090 }
3091 }
3092
3093 pubProposedAccountTransaction(ledger, transaction, result);
3094}
3095
3096void
3098{
3099 // Ledgers are published only when they acquire sufficient validations
3100 // Holes are filled across connection loss or other catastrophe
3101
3103 registry_.get().getAcceptedLedgerCache().fetch(lpAccepted->header().hash);
3104 if (!alpAccepted)
3105 {
3106 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted);
3107 registry_.get().getAcceptedLedgerCache().canonicalizeReplaceClient(
3108 lpAccepted->header().hash, alpAccepted);
3109 }
3110
3111 XRPL_ASSERT(
3112 alpAccepted->getLedger().get() == lpAccepted.get(),
3113 "xrpl::NetworkOPsImp::pubLedger : accepted input");
3114
3115 {
3116 JLOG(journal_.debug()) << "Publishing ledger " << lpAccepted->header().seq << " "
3117 << lpAccepted->header().hash;
3118
3119 std::scoped_lock const sl(subLock_);
3120
3121 if (!streamMaps_[SLedger].empty())
3122 {
3124
3125 jvObj[jss::type] = "ledgerClosed";
3126 jvObj[jss::ledger_index] = lpAccepted->header().seq;
3127 jvObj[jss::ledger_hash] = to_string(lpAccepted->header().hash);
3128 jvObj[jss::ledger_time] =
3129 json::Value::UInt(lpAccepted->header().closeTime.time_since_epoch().count());
3130
3131 jvObj[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
3132
3133 if (!lpAccepted->rules().enabled(featureXRPFees))
3134 jvObj[jss::fee_ref] = kFeeUnitsDeprecated;
3135 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3136 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3137 jvObj[jss::reserve_inc] = lpAccepted->fees().increment.jsonClipped();
3138
3139 jvObj[jss::txn_count] = json::UInt(alpAccepted->size());
3140
3142 {
3143 jvObj[jss::validated_ledgers] =
3144 registry_.get().getLedgerMaster().getCompleteLedgers();
3145 }
3146
3147 auto it = streamMaps_[SLedger].begin();
3148 while (it != streamMaps_[SLedger].end())
3149 {
3150 InfoSub::pointer const p = it->second.lock();
3151 if (p)
3152 {
3153 p->send(jvObj, true);
3154 ++it;
3155 }
3156 else
3157 {
3158 it = streamMaps_[SLedger].erase(it);
3159 }
3160 }
3161 }
3162
3163 if (!streamMaps_[SBookChanges].empty())
3164 {
3165 json::Value const jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
3166
3167 auto it = streamMaps_[SBookChanges].begin();
3168 while (it != streamMaps_[SBookChanges].end())
3169 {
3170 InfoSub::pointer const p = it->second.lock();
3171 if (p)
3172 {
3173 p->send(jvObj, true);
3174 ++it;
3175 }
3176 else
3177 {
3178 it = streamMaps_[SBookChanges].erase(it);
3179 }
3180 }
3181 }
3182
3183 {
3184 static bool kFirstTime = true;
3185 if (kFirstTime)
3186 {
3187 // First validated ledger, start delayed SubAccountHistory
3188 kFirstTime = false;
3189 for (auto& outer : subAccountHistory_)
3190 {
3191 for (auto& inner : outer.second)
3192 {
3193 auto& subInfo = inner.second;
3194 if (subInfo.index->separationLedgerSeq == 0)
3195 {
3196 subAccountHistoryStart(alpAccepted->getLedger(), subInfo);
3197 }
3198 }
3199 }
3200 }
3201 }
3202 }
3203
3204 // Don't lock since pubAcceptedTransaction is locking.
3205 for (auto const& accTx : *alpAccepted)
3206 {
3207 JLOG(journal_.trace()) << "pubAccepted: " << accTx->getJson();
3208 pubValidatedTransaction(lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3209 }
3210}
3211
3212void
3214{
3215 ServerFeeSummary const f{
3216 registry_.get().getOpenLedger().current()->fees().base,
3217 registry_.get().getTxQ().getMetrics(*registry_.get().getOpenLedger().current()),
3218 registry_.get().getFeeTrack()};
3219
3220 // only schedule the job if something has changed
3221 if (f != lastFeeSummary_)
3222 {
3223 jobQueue_.addJob(JtClientFeeChange, "PubFee", [this]() { pubServer(); });
3224 }
3225}
3226
3227void
3229{
3230 jobQueue_.addJob(JtClientConsensus, "PubCons", [this, phase]() { pubConsensus(phase); });
3231}
3232
3233inline void
3235{
3236 localTX_->sweep(view);
3237}
3238inline std::size_t
3240{
3241 return localTX_->size();
3242}
3243
3246{
3247 std::scoped_lock const sl(subLock_);
3248 std::size_t total = 0;
3249 for (auto const& [_, subs] : subBook_)
3250 total += subs.size();
3251 return total;
3252}
3253
3254// This routine should only be used to publish accepted or validated
3255// transactions.
3258 std::shared_ptr<STTx const> const& transaction,
3259 TER result,
3260 bool validated,
3261 std::shared_ptr<ReadView const> const& ledger,
3263{
3265 std::string sToken;
3266 std::string sHuman;
3267
3268 transResultInfo(result, sToken, sHuman);
3269
3270 jvObj[jss::type] = "transaction";
3271 // NOTE jvObj is not a finished object for either API version. After
3272 // it's populated, we need to finish it for a specific API version. This is
3273 // done in a loop, near the end of this function.
3274 jvObj[jss::transaction] = transaction->getJson(JsonOptions::Values::DisableApiPriorV2, false);
3275
3276 if (meta)
3277 {
3278 jvObj[jss::meta] = meta->get().getJson(JsonOptions::Values::None);
3279 RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, transaction, meta->get());
3280 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
3281 RPC::insertMPTokenIssuanceID(jvObj[jss::meta], transaction, meta->get());
3282 }
3283
3284 // add CTID where the needed data for it exists
3285 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
3286 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3287 {
3288 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3289 uint32_t netID = registry_.get().getNetworkIDService().getNetworkID();
3290 if (transaction->isFieldPresent(sfNetworkID))
3291 netID = transaction->getFieldU32(sfNetworkID);
3292
3293 if (std::optional<std::string> ctid = RPC::encodeCTID(ledger->header().seq, txnSeq, netID);
3294 ctid)
3295 jvObj[jss::ctid] = *ctid;
3296 }
3297 if (!ledger->open())
3298 jvObj[jss::ledger_hash] = to_string(ledger->header().hash);
3299
3300 if (validated)
3301 {
3302 jvObj[jss::ledger_index] = ledger->header().seq;
3303 jvObj[jss::transaction][jss::date] = ledger->header().closeTime.time_since_epoch().count();
3304 jvObj[jss::validated] = true;
3305 jvObj[jss::close_time_iso] = toStringIso(ledger->header().closeTime);
3306
3307 // WRITEME: Put the account next seq here
3308 }
3309 else
3310 {
3311 jvObj[jss::validated] = false;
3312 jvObj[jss::ledger_current_index] = ledger->header().seq;
3313 }
3314
3315 jvObj[jss::status] = validated ? "closed" : "proposed";
3316 jvObj[jss::engine_result] = sToken;
3317 jvObj[jss::engine_result_code] = result;
3318 jvObj[jss::engine_result_message] = sHuman;
3319
3320 if (transaction->getTxnType() == ttOFFER_CREATE)
3321 {
3322 auto const account = transaction->getAccountID(sfAccount);
3323 auto const amount = transaction->getFieldAmount(sfTakerGets);
3324
3325 // If the offer create is not self funded then add the owner balance
3326 if (account != amount.getIssuer())
3327 {
3328 auto const ownerFunds = accountFunds(
3329 *ledger,
3330 account,
3331 amount,
3334 registry_.get().getJournal("View"));
3335 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3336 }
3337 }
3338
3339 std::string const hash = to_string(transaction->getTransactionID());
3340 MultiApiJson multiObj{jvObj};
3342 multiObj.visit(), //
3343 [&]<unsigned Version>(json::Value& jvTx, std::integral_constant<unsigned, Version>) {
3344 RPC::insertDeliverMax(jvTx[jss::transaction], transaction->getTxnType(), Version);
3345
3346 if constexpr (Version > 1)
3347 {
3348 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3349 jvTx[jss::hash] = hash;
3350 }
3351 else
3352 {
3353 jvTx[jss::transaction][jss::hash] = hash;
3354 }
3355 });
3356
3357 return multiObj;
3358}
3359
3360void
3362 std::shared_ptr<ReadView const> const& ledger,
3363 AcceptedLedgerTx const& transaction,
3364 bool last)
3365{
3366 auto const& stTxn = transaction.getTxn();
3367
3368 // Create two different Json objects, for different API versions
3369 auto const metaRef = std::ref(transaction.getMeta());
3370 auto const trResult = transaction.getResult();
3371 MultiApiJson const jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3372
3373 {
3374 std::scoped_lock const sl(subLock_);
3375
3376 auto it = streamMaps_[STransactions].begin();
3377 while (it != streamMaps_[STransactions].end())
3378 {
3379 InfoSub::pointer p = it->second.lock();
3380
3381 if (p)
3382 {
3383 jvObj.visit(
3384 p->getApiVersion(), //
3385 [&](json::Value const& jv) { p->send(jv, true); });
3386 ++it;
3387 }
3388 else
3389 {
3390 it = streamMaps_[STransactions].erase(it);
3391 }
3392 }
3393
3394 it = streamMaps_[SRtTransactions].begin();
3395
3396 while (it != streamMaps_[SRtTransactions].end())
3397 {
3398 InfoSub::pointer p = it->second.lock();
3399
3400 if (p)
3401 {
3402 jvObj.visit(
3403 p->getApiVersion(), //
3404 [&](json::Value const& jv) { p->send(jv, true); });
3405 ++it;
3406 }
3407 else
3408 {
3409 it = streamMaps_[SRtTransactions].erase(it);
3410 }
3411 }
3412 }
3413
3414 if (transaction.getResult() == tesSUCCESS)
3415 pubBookTransaction(transaction, jvObj);
3416
3417 pubAccountTransaction(ledger, transaction, last);
3418}
3419
3420void
3422{
3423 auto const books = affectedBooks(alTx, journal_);
3424 if (books.empty())
3425 return;
3426
3427 // Two-pass design:
3428 //
3429 // 1. Under subLock_, walk subBook_, collect a strong pointer for each
3430 // unique listener (and prune any expired weak_ptrs we encounter).
3431 // 2. Release subLock_, then send to each collected listener.
3432 //
3433 // Reasoning:
3434 // * send() can be slow / blocking, so holding subLock_ across it would
3435 // stall every other sub/unsub/pub path on this server (see the matching
3436 // TODO above pubServer at line ~2275).
3437 // * A strong pointer destructed while subLock_ is held risks running
3438 // ~InfoSub() in-line, which re-enters unsubBook() and mutates the very
3439 // subBook_/SubMapType being iterated -> dangling iterator UB.
3440 //
3441 // Releasing subLock_ before any InfoSub::pointer can decay solves both.
3442 // ~InfoSub() reacquires subLock_ via unsubBook() on its own and serializes
3443 // safely with concurrent traffic.
3444
3447
3448 // Sized for the common case where every affected book has at most
3449 // one subscriber. Multi-subscriber books trigger reallocation, but
3450 // that is rare and the upper-bound estimate (sum of per-book sizes)
3451 // would itself require walking subBook_ twice.
3452 listeners.reserve(books.size());
3453 seen.reserve(books.size());
3454
3455 {
3456 std::scoped_lock const sl(subLock_);
3457
3458 for (auto const& book : books)
3459 {
3460 auto it = subBook_.find(book);
3461 if (it == subBook_.end())
3462 continue;
3463
3464 for (auto sit = it->second.begin(); sit != it->second.end();)
3465 {
3466 if (auto p = sit->second.lock())
3467 {
3468 // Defensive: subBook_ entries are normally cleared by
3469 // ~InfoSub() -> unsubBook(), so we rarely see expired
3470 // weak_ptrs here. The else branch covers the narrow race
3471 // where the last strong ref is dropped between insertion
3472 // and our lock() call.
3473 if (seen.emplace(p->getSeq()).second)
3474 listeners.emplace_back(std::move(p));
3475 ++sit;
3476 }
3477 else
3478 {
3479 JLOG(journal_.debug())
3480 << "pubBookTransaction: pruning expired weak_ptr for seq=" << sit->first;
3481 sit = it->second.erase(sit);
3482 }
3483 }
3484
3485 if (it->second.empty())
3486 subBook_.erase(it);
3487 }
3488 }
3489
3490 for (auto const& p : listeners)
3491 {
3492 jvObj.visit(p->getApiVersion(), [&](json::Value const& jv) { p->send(jv, true); });
3493 }
3494 // listeners destructs here, outside subLock_; ~InfoSub (if any fires)
3495 // will reacquire subLock_ via unsubBook with no iterator hazard.
3496}
3497
3498void
3500 std::shared_ptr<ReadView const> const& ledger,
3501 AcceptedLedgerTx const& transaction,
3502 bool last)
3503{
3505 int iProposed = 0;
3506 int iAccepted = 0;
3507
3508 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3509 auto const currLedgerSeq = ledger->seq();
3510 {
3511 std::scoped_lock const sl(subLock_);
3512
3513 if (!subAccount_.empty() || !subRTAccount_.empty() || !subAccountHistory_.empty())
3514 {
3515 for (auto const& affectedAccount : transaction.getAffected())
3516 {
3517 if (auto simiIt = subRTAccount_.find(affectedAccount);
3518 simiIt != subRTAccount_.end())
3519 {
3520 auto it = simiIt->second.begin();
3521
3522 while (it != simiIt->second.end())
3523 {
3524 InfoSub::pointer const p = it->second.lock();
3525
3526 if (p)
3527 {
3528 notify.insert(p);
3529 ++it;
3530 ++iProposed;
3531 }
3532 else
3533 {
3534 it = simiIt->second.erase(it);
3535 }
3536 }
3537 }
3538
3539 if (auto simiIt = subAccount_.find(affectedAccount); simiIt != subAccount_.end())
3540 {
3541 auto it = simiIt->second.begin();
3542 while (it != simiIt->second.end())
3543 {
3544 InfoSub::pointer const p = it->second.lock();
3545
3546 if (p)
3547 {
3548 notify.insert(p);
3549 ++it;
3550 ++iAccepted;
3551 }
3552 else
3553 {
3554 it = simiIt->second.erase(it);
3555 }
3556 }
3557 }
3558
3559 if (auto historyIt = subAccountHistory_.find(affectedAccount);
3560 historyIt != subAccountHistory_.end())
3561 {
3562 auto& subs = historyIt->second;
3563 auto it = subs.begin();
3564 while (it != subs.end())
3565 {
3566 SubAccountHistoryInfoWeak const& info = it->second;
3567 if (currLedgerSeq <= info.index->separationLedgerSeq)
3568 {
3569 ++it;
3570 continue;
3571 }
3572
3573 if (auto isSptr = info.sinkWptr.lock(); isSptr)
3574 {
3575 accountHistoryNotify.emplace_back(
3576 SubAccountHistoryInfo{.sink = isSptr, .index = info.index});
3577 ++it;
3578 }
3579 else
3580 {
3581 it = subs.erase(it);
3582 }
3583 }
3584 if (subs.empty())
3585 subAccountHistory_.erase(historyIt);
3586 }
3587 }
3588 }
3589 }
3590
3591 JLOG(journal_.trace()) << "pubAccountTransaction: "
3592 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3593
3594 if (!notify.empty() || !accountHistoryNotify.empty())
3595 {
3596 auto const& stTxn = transaction.getTxn();
3597
3598 // Create two different Json objects, for different API versions
3599 auto const metaRef = std::ref(transaction.getMeta());
3600 auto const trResult = transaction.getResult();
3601 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3602
3603 for (InfoSub::ref isrListener : notify)
3604 {
3605 jvObj.visit(
3606 isrListener->getApiVersion(), //
3607 [&](json::Value const& jv) { isrListener->send(jv, true); });
3608 }
3609
3610 if (last)
3611 jvObj.set(jss::account_history_boundary, true);
3612
3613 XRPL_ASSERT(
3614 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::IsMemberResult::None,
3615 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3616 "account_history_tx_stream not set");
3617 for (auto& info : accountHistoryNotify)
3618 {
3619 auto& index = info.index;
3620 if (index->forwardTxIndex == 0 && !index->haveHistorical)
3621 jvObj.set(jss::account_history_tx_first, true);
3622
3623 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex++);
3624
3625 jvObj.visit(
3626 info.sink->getApiVersion(), //
3627 [&](json::Value const& jv) { info.sink->send(jv, true); });
3628 }
3629 }
3630}
3631
3632void
3634 std::shared_ptr<ReadView const> const& ledger,
3636 TER result)
3637{
3639 int iProposed = 0;
3640
3641 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3642
3643 {
3644 std::scoped_lock const sl(subLock_);
3645
3646 if (subRTAccount_.empty())
3647 return;
3648
3649 if (!subAccount_.empty() || !subRTAccount_.empty() || !subAccountHistory_.empty())
3650 {
3651 for (auto const& affectedAccount : tx->getMentionedAccounts())
3652 {
3653 if (auto simiIt = subRTAccount_.find(affectedAccount);
3654 simiIt != subRTAccount_.end())
3655 {
3656 auto it = simiIt->second.begin();
3657
3658 while (it != simiIt->second.end())
3659 {
3660 InfoSub::pointer const p = it->second.lock();
3661
3662 if (p)
3663 {
3664 notify.insert(p);
3665 ++it;
3666 ++iProposed;
3667 }
3668 else
3669 {
3670 it = simiIt->second.erase(it);
3671 }
3672 }
3673 }
3674 }
3675 }
3676 }
3677
3678 JLOG(journal_.trace()) << "pubProposedAccountTransaction: " << iProposed;
3679
3680 if (!notify.empty() || !accountHistoryNotify.empty())
3681 {
3682 // Create two different Json objects, for different API versions
3683 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3684
3685 for (InfoSub::ref isrListener : notify)
3686 {
3687 jvObj.visit(
3688 isrListener->getApiVersion(), //
3689 [&](json::Value const& jv) { isrListener->send(jv, true); });
3690 }
3691
3692 XRPL_ASSERT(
3693 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::IsMemberResult::None,
3694 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3695 "account_history_tx_stream not set");
3696 for (auto& info : accountHistoryNotify)
3697 {
3698 auto& index = info.index;
3699 if (index->forwardTxIndex == 0 && !index->haveHistorical)
3700 jvObj.set(jss::account_history_tx_first, true);
3701 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex++);
3702 jvObj.visit(
3703 info.sink->getApiVersion(), //
3704 [&](json::Value const& jv) { info.sink->send(jv, true); });
3705 }
3706 }
3707}
3708
3709//
3710// Monitoring
3711//
3712
3713void
3715 InfoSub::ref isrListener,
3716 hash_set<AccountID> const& vnaAccountIDs,
3717 bool rt)
3718{
3719 SubInfoMapType& subMap = rt ? subRTAccount_ : subAccount_;
3720
3721 for (auto const& naAccountID : vnaAccountIDs)
3722 {
3723 JLOG(journal_.trace()) << "subAccount: account: " << toBase58(naAccountID);
3724
3725 isrListener->insertSubAccountInfo(naAccountID, rt);
3726 }
3727
3728 std::scoped_lock const sl(subLock_);
3729
3730 for (auto const& naAccountID : vnaAccountIDs)
3731 {
3732 auto simIterator = subMap.find(naAccountID);
3733 if (simIterator == subMap.end())
3734 {
3735 // Not found, note that account has a new single listener.
3736 SubMapType usisElement;
3737 usisElement[isrListener->getSeq()] = isrListener;
3738 // VFALCO NOTE This is making a needless copy of naAccountID
3739 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3740 }
3741 else
3742 {
3743 // Found, note that the account has another listener.
3744 simIterator->second[isrListener->getSeq()] = isrListener;
3745 }
3746 }
3747}
3748
3749void
3751 InfoSub::ref isrListener,
3752 hash_set<AccountID> const& vnaAccountIDs,
3753 bool rt)
3754{
3755 for (auto const& naAccountID : vnaAccountIDs)
3756 {
3757 // Remove from the InfoSub
3758 isrListener->deleteSubAccountInfo(naAccountID, rt);
3759 }
3760
3761 // Remove from the server
3762 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3763}
3764
3765void
3767 std::uint64_t uSeq,
3768 hash_set<AccountID> const& vnaAccountIDs,
3769 bool rt)
3770{
3771 std::scoped_lock const sl(subLock_);
3772
3773 SubInfoMapType& subMap = rt ? subRTAccount_ : subAccount_;
3774
3775 for (auto const& naAccountID : vnaAccountIDs)
3776 {
3777 auto simIterator = subMap.find(naAccountID);
3778
3779 if (simIterator != subMap.end())
3780 {
3781 // Found
3782 simIterator->second.erase(uSeq);
3783
3784 if (simIterator->second.empty())
3785 {
3786 // Don't need hash entry.
3787 subMap.erase(simIterator);
3788 }
3789 }
3790 }
3791}
3792
3793void
3795{
3796 registry_.get().getJobQueue().addJob(JtClientAcctHist, "HistTxStream", [this, subInfo]() {
3797 auto const& accountId = subInfo.index->accountId;
3798 auto& lastLedgerSeq = subInfo.index->historyLastLedgerSeq;
3799 auto& txHistoryIndex = subInfo.index->historyTxIndex;
3800
3801 JLOG(journal_.trace()) << "AccountHistory job for account " << toBase58(accountId)
3802 << " started. lastLedgerSeq=" << lastLedgerSeq;
3803
3804 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx,
3805 std::shared_ptr<TxMeta> const& meta) -> bool {
3806 /*
3807 * genesis account: first tx is the one with seq 1
3808 * other account: first tx is the one created the account
3809 */
3810 if (accountId == kGenesisAccountId)
3811 {
3812 auto stx = tx->getSTransaction();
3813 if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
3814 return true;
3815 }
3816
3817 for (auto& node : meta->getNodes())
3818 {
3819 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3820 continue;
3821
3822 if (node.isFieldPresent(sfNewFields))
3823 {
3824 if (auto inner = dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields));
3825 inner)
3826 {
3827 if (inner->isFieldPresent(sfAccount) &&
3828 inner->getAccountID(sfAccount) == accountId)
3829 {
3830 return true;
3831 }
3832 }
3833 }
3834 }
3835
3836 return false;
3837 };
3838
3839 auto send = [&](json::Value const& jvObj, bool unsubscribe) -> bool {
3840 if (auto sptr = subInfo.sinkWptr.lock())
3841 {
3842 sptr->send(jvObj, true);
3843 if (unsubscribe)
3844 unsubAccountHistory(sptr, accountId, false);
3845 return true;
3846 }
3847
3848 return false;
3849 };
3850
3851 auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
3852 if (auto sptr = subInfo.sinkWptr.lock())
3853 {
3854 jvObj.visit(
3855 sptr->getApiVersion(), //
3856 [&](json::Value const& jv) { sptr->send(jv, true); });
3857
3858 if (unsubscribe)
3859 unsubAccountHistory(sptr, accountId, false);
3860 return true;
3861 }
3862
3863 return false;
3864 };
3865
3866 auto getMoreTxns = [&](std::uint32_t minLedger,
3867 std::uint32_t maxLedger,
3869 -> std::pair<
3872 auto& db = registry_.get().getRelationalDatabase();
3874 .account = accountId,
3875 .ledgerRange = {.min = minLedger, .max = maxLedger},
3876 .marker = marker,
3877 .limit = 0,
3878 .bAdmin = true};
3879 return db.newestAccountTxPage(options);
3880 };
3881
3882 /*
3883 * search backward until the genesis ledger or asked to stop
3884 */
3885 while (lastLedgerSeq >= 2 && !subInfo.index->stopHistorical)
3886 {
3887 int feeChargeCount = 0;
3888 if (auto sptr = subInfo.sinkWptr.lock(); sptr)
3889 {
3890 sptr->getConsumer().charge(Resource::kFeeMediumBurdenRpc);
3891 ++feeChargeCount;
3892 }
3893 else
3894 {
3895 JLOG(journal_.trace())
3896 << "AccountHistory job for account " << toBase58(accountId)
3897 << " no InfoSub. Fee charged " << feeChargeCount << " times.";
3898 return;
3899 }
3900
3901 // try to search in 1024 ledgers till reaching genesis ledgers
3902 auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3903 JLOG(journal_.trace())
3904 << "AccountHistory job for account " << toBase58(accountId)
3905 << ", working on ledger range [" << startLedgerSeq << "," << lastLedgerSeq << "]";
3906
3907 auto haveRange = [&]() -> bool {
3908 std::uint32_t validatedMin = UINT_MAX;
3909 std::uint32_t validatedMax = 0;
3910 auto haveSomeValidatedLedgers =
3911 registry_.get().getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
3912
3913 return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq &&
3914 lastLedgerSeq <= validatedMax;
3915 }();
3916
3917 if (!haveRange)
3918 {
3919 JLOG(journal_.debug()) << "AccountHistory reschedule job for account "
3920 << toBase58(accountId) << ", incomplete ledger range ["
3921 << startLedgerSeq << "," << lastLedgerSeq << "]";
3923 return;
3924 }
3925
3927 while (!subInfo.index->stopHistorical)
3928 {
3929 auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3930
3931 auto const& txns = dbResult.first;
3932 marker = dbResult.second;
3933 size_t const numTxns = txns.size();
3934 for (size_t i = 0; i < numTxns; ++i)
3935 {
3936 auto const& [tx, meta] = txns[i];
3937
3938 if (!tx || !meta)
3939 {
3940 JLOG(journal_.debug()) << "AccountHistory job for account "
3941 << toBase58(accountId) << " empty tx or meta.";
3942 send(rpcError(RpcInternal), true);
3943 return;
3944 }
3945 auto curTxLedger =
3946 registry_.get().getLedgerMaster().getLedgerBySeq(tx->getLedger());
3947 if (!curTxLedger)
3948 {
3949 // LCOV_EXCL_START
3950 UNREACHABLE(
3951 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3952 "getLedgerBySeq failed");
3953 JLOG(journal_.debug()) << "AccountHistory job for account "
3954 << toBase58(accountId) << " no ledger.";
3955 send(rpcError(RpcInternal), true);
3956 return;
3957 // LCOV_EXCL_STOP
3958 }
3959 std::shared_ptr<STTx const> const stTxn = tx->getSTransaction();
3960 if (!stTxn)
3961 {
3962 // LCOV_EXCL_START
3963 UNREACHABLE(
3964 "NetworkOPsImp::addAccountHistoryJob : "
3965 "getSTransaction failed");
3966 JLOG(journal_.debug()) << "AccountHistory job for account "
3967 << toBase58(accountId) << " getSTransaction failed.";
3968 send(rpcError(RpcInternal), true);
3969 return;
3970 // LCOV_EXCL_STOP
3971 }
3972
3973 auto const ref = std::ref(*meta);
3974 auto const trR = meta->getResultTER();
3975 MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, ref);
3976
3977 jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
3978 if (i + 1 == numTxns || txns[i + 1].first->getLedger() != tx->getLedger())
3979 jvTx.set(jss::account_history_boundary, true);
3980
3981 if (isFirstTx(tx, meta))
3982 {
3983 jvTx.set(jss::account_history_tx_first, true);
3984 sendMultiApiJson(jvTx, false);
3985
3986 JLOG(journal_.trace()) << "AccountHistory job for account "
3987 << toBase58(accountId) << " done, found last tx.";
3988 return;
3989 }
3990
3991 sendMultiApiJson(jvTx, false);
3992 }
3993
3994 if (marker)
3995 {
3996 JLOG(journal_.trace())
3997 << "AccountHistory job for account " << toBase58(accountId)
3998 << " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
3999 }
4000 else
4001 {
4002 break;
4003 }
4004 }
4005
4006 if (!subInfo.index->stopHistorical)
4007 {
4008 lastLedgerSeq = startLedgerSeq - 1;
4009 if (lastLedgerSeq <= 1)
4010 {
4011 JLOG(journal_.trace())
4012 << "AccountHistory job for account " << toBase58(accountId)
4013 << " done, reached genesis ledger.";
4014 return;
4015 }
4016 }
4017 }
4018 });
4019}
4020
4021void
4023 std::shared_ptr<ReadView const> const& ledger,
4025{
4026 subInfo.index->separationLedgerSeq = ledger->seq();
4027 auto const& accountId = subInfo.index->accountId;
4028 auto const accountKeylet = keylet::account(accountId);
4029 if (!ledger->exists(accountKeylet))
4030 {
4031 JLOG(journal_.debug()) << "subAccountHistoryStart, no account " << toBase58(accountId)
4032 << ", no need to add AccountHistory job.";
4033 return;
4034 }
4035 if (accountId == kGenesisAccountId)
4036 {
4037 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4038 {
4039 if (sleAcct->getFieldU32(sfSequence) == 1)
4040 {
4041 JLOG(journal_.debug())
4042 << "subAccountHistoryStart, genesis account " << toBase58(accountId)
4043 << " does not have tx, no need to add AccountHistory job.";
4044 return;
4045 }
4046 }
4047 else
4048 {
4049 // LCOV_EXCL_START
4050 UNREACHABLE(
4051 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
4052 "access genesis account");
4053 return;
4054 // LCOV_EXCL_STOP
4055 }
4056 }
4057 subInfo.index->historyLastLedgerSeq = ledger->seq();
4058 subInfo.index->haveHistorical = true;
4059
4060 JLOG(journal_.debug()) << "subAccountHistoryStart, add AccountHistory job: accountId="
4061 << toBase58(accountId) << ", currentLedgerSeq=" << ledger->seq();
4062
4063 addAccountHistoryJob(subInfo);
4064}
4065
4068{
4069 if (!isrListener->insertSubAccountHistory(accountId))
4070 {
4071 JLOG(journal_.debug()) << "subAccountHistory, already subscribed to account "
4072 << toBase58(accountId);
4073 return RpcInvalidParams;
4074 }
4075
4076 std::scoped_lock const sl(subLock_);
4078 .sinkWptr = isrListener, .index = std::make_shared<SubAccountHistoryIndex>(accountId)};
4079 auto simIterator = subAccountHistory_.find(accountId);
4080 if (simIterator == subAccountHistory_.end())
4081 {
4083 inner.emplace(isrListener->getSeq(), ahi);
4084 subAccountHistory_.insert(simIterator, std::make_pair(accountId, inner));
4085 }
4086 else
4087 {
4088 simIterator->second.emplace(isrListener->getSeq(), ahi);
4089 }
4090
4091 auto const ledger = registry_.get().getLedgerMaster().getValidatedLedger();
4092 if (ledger)
4093 {
4094 subAccountHistoryStart(ledger, ahi);
4095 }
4096 else
4097 {
4098 // The node does not have validated ledgers, so wait for
4099 // one before start streaming.
4100 // In this case, the subscription is also considered successful.
4101 JLOG(journal_.debug()) << "subAccountHistory, no validated ledger yet, delay start";
4102 }
4103
4104 return RpcSuccess;
4105}
4106
4107void
4109 InfoSub::ref isrListener,
4110 AccountID const& account,
4111 bool historyOnly)
4112{
4113 if (!historyOnly)
4114 isrListener->deleteSubAccountHistory(account);
4115 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
4116}
4117
4118void
4120 std::uint64_t seq,
4121 AccountID const& account,
4122 bool historyOnly)
4123{
4124 std::scoped_lock const sl(subLock_);
4125 auto simIterator = subAccountHistory_.find(account);
4126 if (simIterator != subAccountHistory_.end())
4127 {
4128 auto& subInfoMap = simIterator->second;
4129 auto subInfoIter = subInfoMap.find(seq);
4130 if (subInfoIter != subInfoMap.end())
4131 {
4132 subInfoIter->second.index->stopHistorical = true;
4133 }
4134
4135 if (!historyOnly)
4136 {
4137 simIterator->second.erase(seq);
4138 if (simIterator->second.empty())
4139 {
4140 subAccountHistory_.erase(simIterator);
4141 }
4142 }
4143 JLOG(journal_.debug()) << "unsubAccountHistory, account " << toBase58(account)
4144 << ", historyOnly = " << (historyOnly ? "true" : "false");
4145 }
4146}
4147
4148bool
4150{
4151 // Server-side insert first, then InfoSub bookkeeping. If the InfoSub-side
4152 // insert throws, the orphan in subBook_ is cleared by the expired-weak_ptr
4153 // prune in pubBookTransaction. With the reverse ordering, ~InfoSub would
4154 // call unsubBookInternal for a key that was never inserted server-side.
4155 {
4156 std::scoped_lock const sl(subLock_);
4157 subBook_[book].try_emplace(isrListener->getSeq(), isrListener);
4158 }
4159 isrListener->insertBookSubscription(book);
4160 return true;
4161}
4162
4163bool
4165{
4166 // Mirrors unsubAccount: clear the per-subscriber tracking set first so
4167 // ~InfoSub does not re-issue an unsubBookInternal for a book the caller
4168 // already removed, then erase the server-side entry.
4169 isrListener->deleteBookSubscription(book);
4170 return unsubBookInternal(isrListener->getSeq(), book);
4171}
4172
4173bool
4175{
4176 std::scoped_lock const sl(subLock_);
4177 auto it = subBook_.find(book);
4178 if (it == subBook_.end())
4179 return false;
4180 bool const erased = it->second.erase(uSeq) != 0u;
4181 if (it->second.empty())
4182 subBook_.erase(it);
4183 return erased;
4184}
4185
4188{
4189 // This code-path is exclusively used when the server is in standalone
4190 // mode via `ledger_accept`
4191 XRPL_ASSERT(standalone_, "xrpl::NetworkOPsImp::acceptLedger : is standalone");
4192
4193 if (!standalone_)
4194 Throw<std::runtime_error>("Operation only possible in STANDALONE mode.");
4195
4196 // FIXME Could we improve on this and remove the need for a specialized
4197 // API in Consensus?
4198 beginConsensus(ledgerMaster_.getClosedLedger()->header().hash, {});
4199 consensus_.simulate(registry_.get().getTimeKeeper().closeTime(), consensusDelay);
4200 return ledgerMaster_.getCurrentLedger()->header().seq;
4201}
4202
4203// <-- bool: true=added, false=already there
4204bool
4206{
4207 if (auto lpClosed = ledgerMaster_.getValidatedLedger())
4208 {
4209 jvResult[jss::ledger_index] = lpClosed->header().seq;
4210 jvResult[jss::ledger_hash] = to_string(lpClosed->header().hash);
4211 jvResult[jss::ledger_time] =
4212 json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
4213 if (!lpClosed->rules().enabled(featureXRPFees))
4214 jvResult[jss::fee_ref] = kFeeUnitsDeprecated;
4215 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4216 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4217 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4218 jvResult[jss::network_id] = registry_.get().getNetworkIDService().getNetworkID();
4219 }
4220
4222 {
4223 jvResult[jss::validated_ledgers] = registry_.get().getLedgerMaster().getCompleteLedgers();
4224 }
4225
4226 std::scoped_lock const sl(subLock_);
4227 return streamMaps_[SLedger].emplace(isrListener->getSeq(), isrListener).second;
4228}
4229
4230// <-- bool: true=added, false=already there
4231bool
4233{
4234 std::scoped_lock const sl(subLock_);
4235 return streamMaps_[SBookChanges].emplace(isrListener->getSeq(), isrListener).second;
4236}
4237
4238// <-- bool: true=erased, false=was not there
4239bool
4241{
4242 std::scoped_lock const sl(subLock_);
4243 return streamMaps_[SLedger].erase(uSeq) != 0u;
4244}
4245
4246// <-- bool: true=erased, false=was not there
4247bool
4249{
4250 std::scoped_lock const sl(subLock_);
4251 return streamMaps_[SBookChanges].erase(uSeq) != 0u;
4252}
4253
4254// <-- bool: true=added, false=already there
4255bool
4257{
4258 std::scoped_lock const sl(subLock_);
4259 return streamMaps_[SManifests].emplace(isrListener->getSeq(), isrListener).second;
4260}
4261
4262// <-- bool: true=erased, false=was not there
4263bool
4265{
4266 std::scoped_lock const sl(subLock_);
4267 return streamMaps_[SManifests].erase(uSeq) != 0u;
4268}
4269
4270// <-- bool: true=added, false=already there
4271bool
4272NetworkOPsImp::subServer(InfoSub::ref isrListener, json::Value& jvResult, bool admin)
4273{
4274 uint256 uRandom;
4275
4276 if (standalone_)
4277 jvResult[jss::stand_alone] = standalone_;
4278
4279 // CHECKME: is it necessary to provide a random number here?
4280 beast::rngfill(uRandom.begin(), uRandom.size(), cryptoPrng());
4281
4282 auto const& feeTrack = registry_.get().getFeeTrack();
4283 jvResult[jss::random] = to_string(uRandom);
4284 jvResult[jss::server_status] = strOperatingMode(admin);
4285 jvResult[jss::load_base] = feeTrack.getLoadBase();
4286 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4287 jvResult[jss::hostid] = getHostId(admin);
4288 jvResult[jss::pubkey_node] =
4289 toBase58(TokenType::NodePublic, registry_.get().getApp().nodeIdentity().first);
4290
4291 std::scoped_lock const sl(subLock_);
4292 return streamMaps_[SServer].emplace(isrListener->getSeq(), isrListener).second;
4293}
4294
4295// <-- bool: true=erased, false=was not there
4296bool
4298{
4299 std::scoped_lock const sl(subLock_);
4300 return streamMaps_[SServer].erase(uSeq) != 0u;
4301}
4302
4303// <-- bool: true=added, false=already there
4304bool
4306{
4307 std::scoped_lock const sl(subLock_);
4308 return streamMaps_[STransactions].emplace(isrListener->getSeq(), isrListener).second;
4309}
4310
4311// <-- bool: true=erased, false=was not there
4312bool
4314{
4315 std::scoped_lock const sl(subLock_);
4316 return streamMaps_[STransactions].erase(uSeq) != 0u;
4317}
4318
4319// <-- bool: true=added, false=already there
4320bool
4322{
4323 std::scoped_lock const sl(subLock_);
4324 return streamMaps_[SRtTransactions].emplace(isrListener->getSeq(), isrListener).second;
4325}
4326
4327// <-- bool: true=erased, false=was not there
4328bool
4330{
4331 std::scoped_lock const sl(subLock_);
4332 return streamMaps_[SRtTransactions].erase(uSeq) != 0u;
4333}
4334
4335// <-- bool: true=added, false=already there
4336bool
4338{
4339 std::scoped_lock const sl(subLock_);
4340 return streamMaps_[SValidations].emplace(isrListener->getSeq(), isrListener).second;
4341}
4342
4343void
4348
4349// <-- bool: true=erased, false=was not there
4350bool
4352{
4353 std::scoped_lock const sl(subLock_);
4354 return streamMaps_[SValidations].erase(uSeq) != 0u;
4355}
4356
4357// <-- bool: true=added, false=already there
4358bool
4360{
4361 std::scoped_lock const sl(subLock_);
4362 return streamMaps_[SPeerStatus].emplace(isrListener->getSeq(), isrListener).second;
4363}
4364
4365// <-- bool: true=erased, false=was not there
4366bool
4368{
4369 std::scoped_lock const sl(subLock_);
4370 return streamMaps_[SPeerStatus].erase(uSeq) != 0u;
4371}
4372
4373// <-- bool: true=added, false=already there
4374bool
4376{
4377 std::scoped_lock const sl(subLock_);
4378 return streamMaps_[SConsensusPhase].emplace(isrListener->getSeq(), isrListener).second;
4379}
4380
4381// <-- bool: true=erased, false=was not there
4382bool
4384{
4385 std::scoped_lock const sl(subLock_);
4386 return streamMaps_[SConsensusPhase].erase(uSeq) != 0u;
4387}
4388
4391{
4392 std::scoped_lock const sl(subLock_);
4393
4394 subRpcMapType::iterator const it = rpcSubMap_.find(strUrl);
4395
4396 if (it != rpcSubMap_.end())
4397 return it->second;
4398
4399 return InfoSub::pointer();
4400}
4401
4404{
4405 std::scoped_lock const sl(subLock_);
4406
4407 rpcSubMap_.emplace(strUrl, rspEntry);
4408
4409 return rspEntry;
4410}
4411
4412bool
4414{
4415 std::scoped_lock const sl(subLock_);
4416 auto pInfo = findRpcSub(strUrl);
4417
4418 if (!pInfo)
4419 return false;
4420
4421 // check to see if any of the stream maps still hold a weak reference to
4422 // this entry before removing
4423 for (SubMapType const& map : streamMaps_)
4424 {
4425 if (map.contains(pInfo->getSeq()))
4426 return false;
4427 }
4428 rpcSubMap_.erase(strUrl);
4429 return true;
4430}
4431
4432#ifndef USE_NEW_BOOK_PAGE
4433
4434// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4435// work, but it demonstrated poor performance.
4436//
4437void
4440 Book const& book,
4441 AccountID const& uTakerID,
4442 bool const bProof,
4443 unsigned int iLimit,
4444 json::Value const& jvMarker,
4445 json::Value& jvResult)
4446{ // CAUTION: This is the old get book page logic
4447 json::Value& jvOffers = (jvResult[jss::offers] = json::Value(json::ValueType::Array));
4448
4450 uint256 const uBookBase = getBookBase(book);
4451 uint256 const uBookEnd = getQualityNext(uBookBase);
4452 uint256 uTipIndex = uBookBase;
4453
4454 if (auto stream = journal_.trace())
4455 {
4456 stream << "getBookPage:" << book;
4457 stream << "getBookPage: uBookBase=" << uBookBase;
4458 stream << "getBookPage: uBookEnd=" << uBookEnd;
4459 stream << "getBookPage: uTipIndex=" << uTipIndex;
4460 }
4461
4462 ReadView const& view = *lpLedger;
4463
4464 bool const bGlobalFreeze =
4465 isGlobalFrozen(view, book.out.getIssuer()) || isGlobalFrozen(view, book.in.getIssuer());
4466
4467 bool bDone = false;
4468 bool bDirectAdvance = true;
4469
4470 SLE::const_pointer sleOfferDir;
4471 uint256 offerIndex;
4472 unsigned int uBookEntry = 0;
4473 STAmount saDirRate;
4474
4475 auto const rate = transferRate(view, book.out.getIssuer());
4476 auto viewJ = registry_.get().getJournal("View");
4477
4478 while (!bDone && iLimit-- > 0)
4479 {
4480 if (bDirectAdvance)
4481 {
4482 bDirectAdvance = false;
4483
4484 JLOG(journal_.trace()) << "getBookPage: bDirectAdvance";
4485
4486 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4487 if (ledgerIndex)
4488 {
4489 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4490 }
4491 else
4492 {
4493 sleOfferDir.reset();
4494 }
4495
4496 if (!sleOfferDir)
4497 {
4498 JLOG(journal_.trace()) << "getBookPage: bDone";
4499 bDone = true;
4500 }
4501 else
4502 {
4503 uTipIndex = sleOfferDir->key();
4504 saDirRate = amountFromQuality(getQuality(uTipIndex));
4505
4506 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4507
4508 JLOG(journal_.trace()) << "getBookPage: uTipIndex=" << uTipIndex;
4509 JLOG(journal_.trace()) << "getBookPage: offerIndex=" << offerIndex;
4510 }
4511 }
4512
4513 if (!bDone)
4514 {
4515 auto sleOffer = view.read(keylet::offer(offerIndex));
4516
4517 if (sleOffer)
4518 {
4519 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4520 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4521 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4522 STAmount saOwnerFunds;
4523 bool firstOwnerOffer(true);
4524
4525 if (book.out.getIssuer() == uOfferOwnerID)
4526 {
4527 // If an offer is selling issuer's own IOUs, it is fully
4528 // funded.
4529 saOwnerFunds = saTakerGets;
4530 }
4531 else if (bGlobalFreeze)
4532 {
4533 // If either asset is globally frozen, consider all offers
4534 // that aren't ours to be totally unfunded
4535 saOwnerFunds.clear(book.out);
4536 }
4537 else
4538 {
4539 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4540 if (umBalanceEntry != umBalance.end())
4541 {
4542 // Found in running balance table.
4543
4544 saOwnerFunds = umBalanceEntry->second;
4545 firstOwnerOffer = false;
4546 }
4547 else
4548 {
4549 // Did not find balance in table.
4550
4551 saOwnerFunds = accountHolds(
4552 view,
4553 uOfferOwnerID,
4554 book.out,
4557 viewJ);
4558
4559 if (saOwnerFunds < beast::kZero)
4560 {
4561 // Treat negative funds as zero.
4562
4563 saOwnerFunds.clear();
4564 }
4565 }
4566 }
4567
4568 json::Value jvOffer = sleOffer->getJson(JsonOptions::Values::None);
4569
4570 STAmount saTakerGetsFunded;
4571 STAmount saOwnerFundsLimit = saOwnerFunds;
4572 Rate offerRate = kParityRate;
4573
4574 if (rate != kParityRate
4575 // Have a transfer fee.
4576 && uTakerID != book.out.getIssuer()
4577 // Not taking offers of own IOUs.
4578 && book.out.getIssuer() != uOfferOwnerID)
4579 // Offer owner not issuing ownfunds
4580 {
4581 // Need to charge a transfer fee to offer owner.
4582 offerRate = rate;
4583 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4584 }
4585
4586 if (saOwnerFundsLimit >= saTakerGets)
4587 {
4588 // Sufficient funds no shenanigans.
4589 saTakerGetsFunded = saTakerGets;
4590 }
4591 else
4592 {
4593 // Only provide, if not fully funded.
4594
4595 saTakerGetsFunded = saOwnerFundsLimit;
4596
4597 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4598 std::min(
4599 saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.asset()))
4600 .setJson(jvOffer[jss::taker_pays_funded]);
4601 }
4602
4603 STAmount const saOwnerPays = (kParityRate == offerRate)
4604 ? saTakerGetsFunded
4605 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4606
4607 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4608
4609 // Include all offers funded and unfunded
4610 json::Value& jvOf = jvOffers.append(jvOffer);
4611 jvOf[jss::quality] = saDirRate.getText();
4612
4613 if (firstOwnerOffer)
4614 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4615 }
4616 else
4617 {
4618 JLOG(journal_.warn()) << "Missing offer";
4619 }
4620
4621 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4622 {
4623 bDirectAdvance = true;
4624 }
4625 else
4626 {
4627 JLOG(journal_.trace()) << "getBookPage: offerIndex=" << offerIndex;
4628 }
4629 }
4630 }
4631
4632 // jvResult[jss::marker] = json::Value(json::ValueType::Array);
4633 // jvResult[jss::nodes] = json::Value(json::ValueType::Array);
4634}
4635
4636#else
4637
4638// This is the new code that uses the book iterators
4639// It has temporarily been disabled
4640
4641void
4644 Book const& book,
4645 AccountID const& uTakerID,
4646 bool const bProof,
4647 unsigned int iLimit,
4648 json::Value const& jvMarker,
4649 json::Value& jvResult)
4650{
4651 auto& jvOffers = (jvResult[jss::offers] = json::Value(json::ValueType::Array));
4652
4654
4655 MetaView lesActive(lpLedger, tapNONE, true);
4656 OrderBookIterator obIterator(lesActive, book);
4657
4658 auto const rate = transferRate(lesActive, book.out.account);
4659
4660 bool const bGlobalFreeze =
4661 lesActive.isGlobalFrozen(book.out.account) || lesActive.isGlobalFrozen(book.in.account);
4662
4663 while (iLimit-- > 0 && obIterator.nextOffer())
4664 {
4665 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4666 if (sleOffer)
4667 {
4668 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4669 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4670 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4671 STAmount saDirRate = obIterator.getCurrentRate();
4672 STAmount saOwnerFunds;
4673
4674 if (book.out.account == uOfferOwnerID)
4675 {
4676 // If offer is selling issuer's own IOUs, it is fully funded.
4677 saOwnerFunds = saTakerGets;
4678 }
4679 else if (bGlobalFreeze)
4680 {
4681 // If either asset is globally frozen, consider all offers
4682 // that aren't ours to be totally unfunded
4683 saOwnerFunds.clear(book.out);
4684 }
4685 else
4686 {
4687 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4688
4689 if (umBalanceEntry != umBalance.end())
4690 {
4691 // Found in running balance table.
4692
4693 saOwnerFunds = umBalanceEntry->second;
4694 }
4695 else
4696 {
4697 // Did not find balance in table.
4698
4699 saOwnerFunds = lesActive.accountHolds(
4700 uOfferOwnerID,
4701 book.out.currency,
4702 book.out.account,
4704
4705 if (saOwnerFunds.isNegative())
4706 {
4707 // Treat negative funds as zero.
4708
4709 saOwnerFunds.zero();
4710 }
4711 }
4712 }
4713
4714 json::Value jvOffer = sleOffer->getJson(JsonOptions::Values::None);
4715
4716 STAmount saTakerGetsFunded;
4717 STAmount saOwnerFundsLimit = saOwnerFunds;
4718 Rate offerRate = parityRate;
4719
4720 if (rate != parityRate
4721 // Have a transfer fee.
4722 && uTakerID != book.out.account
4723 // Not taking offers of own IOUs.
4724 && book.out.account != uOfferOwnerID)
4725 // Offer owner not issuing ownfunds
4726 {
4727 // Need to charge a transfer fee to offer owner.
4728 offerRate = rate;
4729 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4730 }
4731
4732 if (saOwnerFundsLimit >= saTakerGets)
4733 {
4734 // Sufficient funds no shenanigans.
4735 saTakerGetsFunded = saTakerGets;
4736 }
4737 else
4738 {
4739 // Only provide, if not fully funded.
4740 saTakerGetsFunded = saOwnerFundsLimit;
4741
4742 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4743
4744 // TODO(tom): The result of this expression is not used - what's
4745 // going on here?
4746 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.asset()))
4747 .setJson(jvOffer[jss::taker_pays_funded]);
4748 }
4749
4750 STAmount saOwnerPays = (parityRate == offerRate)
4751 ? saTakerGetsFunded
4752 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4753
4754 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4755
4756 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4757 {
4758 // Only provide funded offers and offers of the taker.
4759 json::Value& jvOf = jvOffers.append(jvOffer);
4760 jvOf[jss::quality] = saDirRate.getText();
4761 }
4762 }
4763 }
4764
4765 // jvResult[jss::marker] = json::Value(json::ValueType::Array);
4766 // jvResult[jss::nodes] = json::Value(json::ValueType::Array);
4767}
4768
4769#endif
4770
4771inline void
4773{
4774 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4777 counters[static_cast<std::size_t>(mode)].dur += current;
4778
4779 std::scoped_lock const lock(statsMutex_);
4780 stats_.disconnectedDuration.set(
4781 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].dur.count());
4782 stats_.connectedDuration.set(
4783 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].dur.count());
4784 stats_.syncingDuration.set(
4785 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4786 stats_.trackingDuration.set(
4787 counters[static_cast<std::size_t>(OperatingMode::TRACKING)].dur.count());
4788 stats_.fullDuration.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4789
4790 stats_.disconnectedTransitions.set(
4791 counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions);
4792 stats_.connectedTransitions.set(
4793 counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].transitions);
4794 stats_.syncingTransitions.set(
4795 counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4796 stats_.trackingTransitions.set(
4797 counters[static_cast<std::size_t>(OperatingMode::TRACKING)].transitions);
4798 stats_.fullTransitions.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4799}
4800
4801void
4803{
4804 auto now = std::chrono::steady_clock::now();
4805
4806 std::scoped_lock const lock(mutex_);
4807 ++counters_[static_cast<std::size_t>(om)].transitions;
4808 if (om == OperatingMode::FULL && counters_[static_cast<std::size_t>(om)].transitions == 1)
4809 {
4812 }
4813 counters_[static_cast<std::size_t>(mode_)].dur +=
4815
4816 mode_ = om;
4817 start_ = now;
4818}
4819
4820void
4822{
4823 auto [counters, mode, start, initialSync] = getCounterData();
4826 counters[static_cast<std::size_t>(mode)].dur += current;
4827
4828 obj[jss::state_accounting] = json::ValueType::Object;
4830 i <= static_cast<std::size_t>(OperatingMode::FULL);
4831 ++i)
4832 {
4833 obj[jss::state_accounting][kStates[i]] = json::ValueType::Object;
4834 auto& state = obj[jss::state_accounting][kStates[i]];
4835 state[jss::transitions] = std::to_string(counters[i].transitions);
4836 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4837 }
4838 obj[jss::server_state_duration_us] = std::to_string(current.count());
4839 if (initialSync != 0u)
4840 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4841}
4842
4843//------------------------------------------------------------------------------
4844
4847 ServiceRegistry& registry,
4849 bool standalone,
4850 std::size_t minPeerCount,
4851 bool startValid,
4852 JobQueue& jobQueue,
4853 LedgerMaster& ledgerMaster,
4854 ValidatorKeys const& validatorKeys,
4855 boost::asio::io_context& ioCtx,
4857 beast::insight::Collector::ptr const& collector)
4858{
4860 registry,
4861 clock,
4862 standalone,
4863 minPeerCount,
4864 startValid,
4865 jobQueue,
4866 ledgerMaster,
4867 validatorKeys,
4868 ioCtx,
4869 journal,
4870 collector);
4871}
4872
4873} // namespace xrpl
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
std::shared_ptr< Collector > ptr
Definition Collector.h:26
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:44
Represents a JSON value.
Definition json_value.h:130
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
json::UInt UInt
Definition json_value.h:137
Value & append(Value const &value)
Append value to array at the end.
UInt size() const
Number of values in array or object.
bool isMember(char const *key) const
Return true if the object has a member named key.
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
bool isZero() const
Definition base_uint.h:544
bool isNonZero() const
Definition base_uint.h:549
iterator begin()
Definition base_uint.h:117
static constexpr std::size_t size()
Definition base_uint.h:530
Specifies an order book.
Definition Book.h:16
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
Definition ClusterNode.h:33
NetClock::time_point getReportTime() const
Definition ClusterNode.h:39
PublicKey const & identity() const
Definition ClusterNode.h:45
std::string const & name() const
Definition ClusterNode.h:27
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:47
std::shared_ptr< InfoSub > const & ref
Definition InfoSub.h:53
std::weak_ptr< InfoSub > wptr
Definition InfoSub.h:51
A pool of threads to perform work.
Definition JobQueue.h:43
Manages the current fee schedule.
Manages load sources.
Definition LoadManager.h:27
void heartbeat()
Reset the stall detection timer.
static constexpr int kHoldLedgers
Definition LocalTxs.h:20
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< json::StaticString const, 5 > const kStates
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
SubBookMapType subBook_
Guarded by subLock_.
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
SubInfoMapType subAccount_
hash_map< AccountID, SubMapType > SubInfoMapType
std::size_t const minPeerCount_
std::vector< TransactionStatus > transactions_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
NetworkOPsImp(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startValid, JobQueue &jobQueue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &ioCtx, beast::Journal journal, beast::insight::Collector::ptr const &collector)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
std::condition_variable cond_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
hash_map< Book, SubMapType > SubBookMapType
Maps each order book to its current set of subscribers.
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void stop() override
DispatchState dispatchState_
hash_map< std::string, InfoSub::pointer > subRpcMapType
ErrorCodeI subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
bool subLedger(InfoSub::ref ispListener, json::Value &jvResult) override
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiryTime, std::function< void()> onExpire, std::function< void()> onError)
bool unsubRTTransactions(std::uint64_t uListener) override
beast::Journal const & journal() const override
Journal used by InfoSub for diagnostics that occur after the owning subsystem (e.g.
json::Value getLedgerFetchInfo() override
bool processTrustedProposal(RCLCxPeerPos proposal) override
subRpcMapType rpcSubMap_
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
json::Value getConsensusInfo() override
ConsensusPhase lastConsensusPhase_
beast::Journal journal_
bool subServer(InfoSub::ref ispListener, json::Value &jvResult, bool admin) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
bool isNeedNetworkLedger() override
bool unsubBookInternal(std::uint64_t uListener, Book const &) override
Remove a book subscription during InfoSub teardown.
DispatchState
Synchronization states for transaction batches.
SubInfoMapType subRTAccount_
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
std::array< SubMapType, SubTypes::SLastEntry > streamMaps_
bool subConsensus(InfoSub::ref ispListener) override
std::reference_wrapper< ServiceRegistry > registry_
static std::array< char const *, 5 > const kStates
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void pubBookTransaction(AcceptedLedgerTx const &transaction, MultiApiJson const &jvObj)
Fan transaction notifications out to all book subscribers.
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
bool isFull() override
void clearAmendmentWarned() override
LedgerMaster & ledgerMaster_
std::atomic< OperatingMode > mode_
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
std::unique_ptr< LocalTxs > localTX_
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
void setStandAlone() override
void setNeedNetworkLedger() override
hash_map< std::uint64_t, InfoSub::wptr > SubMapType
std::size_t getBookSubscribersCount() override
Total number of (book, subscriber) entries currently tracked.
bool unsubServer(std::uint64_t uListener) override
bool unsubConsensus(std::uint64_t uListener) override
void pubManifest(Manifest const &) override
void consensusViewChange() override
boost::asio::steady_timer accountHistoryTxTimer_
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
hash_map< AccountID, hash_map< std::uint64_t, SubAccountHistoryInfoWeak > > SubAccountHistoryMapType
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
void pubPeerStatus(std::function< json::Value(void)> const &) override
StateAccounting accounting_
SubAccountHistoryMapType subAccountHistory_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
RCLConsensus consensus_
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, json::Value const &jvMarker, json::Value &jvResult) override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
std::recursive_mutex subLock_
boost::asio::steady_timer clusterTimer_
bool unsubPeerStatus(std::uint64_t uListener) override
bool unsubBook(InfoSub::ref ispListener, Book const &) override
Remove a book subscription for a live subscriber.
void reportFeeChange() override
ServerFeeSummary lastFeeSummary_
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool isBlocked() override
~NetworkOPsImp() override
json::Value getServerInfo(bool human, bool admin, bool counters) override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:71
beast::AbstractClock< std::chrono::steady_clock > clock_type
Definition NetworkOPs.h:73
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:45
std::vector< std::shared_ptr< Peer > > PeerSequence
Definition Overlay.h:53
Manages the generic consensus algorithm for use by the RCL.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:43
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:432
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:31
virtual SLE::const_pointer read(Keylet const &k) const =0
Return the state item associated with a key.
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
std::vector< AccountTx > AccountTxs
std::string getText() const override
Definition STAmount.cpp:646
Asset const & asset() const
Definition STAmount.h:478
void setJson(json::Value &) const
Definition STAmount.cpp:606
std::shared_ptr< STLedgerEntry > pointer
std::shared_ptr< STLedgerEntry const > const_pointer
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:202
std::size_t size() const noexcept
Definition Serializer.h:50
void const * data() const noexcept
Definition Serializer.h:56
Service registry for dependency injection.
static time_point now()
Validator keys and manifest as set in configuration file.
json::Value jsonClipped() const
Definition XRPAmount.h:199
constexpr double decimalXRP() const
Definition XRPAmount.h:243
T duration_cast(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T make_shared(T... args)
T make_unique(T... args)
T max(T... args)
T min(T... args)
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
JSON (JavaScript Object Notation).
Definition json_errors.h:5
int Int
unsigned int UInt
@ Array
array value (ordered list)
Definition json_value.h:25
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:68
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:31
void insertMPTokenIssuanceID(json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
static constexpr std::integral_constant< unsigned, Version > kApiVersion
Definition ApiVersion.h:38
void insertNFTSyntheticInJson(json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:27
void insertDeliveredAmount(json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
Charge const kFeeMediumBurdenRpc
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
std::string const & getCommitHash()
Definition Git.cpp:18
std::string const & getBuildBranch()
Definition Git.cpp:25
Keylet book(Book const &b)
The beginning of an order book.
Definition Indexes.cpp:235
Keylet ownerDir(AccountID const &id) noexcept
The root page of an account's directory.
Definition Indexes.cpp:357
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:264
Keylet child(uint256 const &key) noexcept
Any item that can be in an owner dir.
Definition Indexes.cpp:192
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:186
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:363
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:57
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
@ WarnRpcAmendmentBlocked
Definition ErrorCodes.h:157
@ WarnRpcUnsupportedMajority
Definition ErrorCodes.h:156
@ WarnRpcExpiredValidatorList
Definition ErrorCodes.h:158
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:69
@ terQUEUED
Definition TER.h:217
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
constexpr FlagValue tfInnerBatchTxn
Definition TxFlags.h:43
bool isTerRetry(TER x) noexcept
Definition TER.h:657
ErrorCodeI
Definition ErrorCodes.h:22
@ RpcInternal
Definition ErrorCodes.h:112
@ RpcSuccess
Definition ErrorCodes.h:26
@ RpcInvalidParams
Definition ErrorCodes.h:66
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
@ WrongLedger
We have the wrong ledger and are attempting to acquire it.
@ Proposing
We are normal participant in consensus and propose our position.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
SendIfPred< Predicate > sendIf(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:54
@ SigBad
Signature is bad. Didn't do local checks.
Definition apply.h:21
@ Valid
Signature and local checks are good / passed.
Definition apply.h:25
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
constexpr std::size_t kMaxPoppedTransactions
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
std::unique_ptr< FeeVote > makeFeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules)
Checks transaction signature and local checks.
Definition apply.cpp:37
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:61
FeeSetup setupFeeVote(Section const &section)
Definition Config.cpp:1199
@ tefPAST_SEQ
Definition TER.h:165
constexpr std::enable_if_t< std::is_integral_v< Dest > &&std::is_integral_v< Src >, Dest > safeCast(Src s) noexcept
Definition safe_cast.h:21
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:152
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:93
std::unordered_set< Value, Hash, Pred, Allocator > hash_set
Number root(Number f, unsigned d)
Definition Number.cpp:1201
CsprngEngine & cryptoPrng()
The default cryptographically secure PRNG.
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:232
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:58
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
constexpr std::uint32_t kFeeUnitsDeprecated
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
std::string toStringIso(date::sys_time< Duration > tp)
Definition chrono.h:68
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
json::Value rpcError(ErrorCodeI iError)
Definition RPCErr.cpp:13
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Check if the issuer has the global freeze flag set.
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:895
@ JtClientAcctHist
Definition Job.h:30
@ JtTransaction
Definition Job.h:43
@ JtBatch
Definition Job.h:46
@ JtTxnProc
Definition Job.h:63
@ JtClientConsensus
Definition Job.h:29
@ JtNetopCluster
Definition Job.h:56
@ JtClientFeeChange
Definition Job.h:28
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:158
bool isTefFailure(TER x) noexcept
Definition TER.h:651
std::unique_ptr< LocalTxs > makeLocalTxs()
Definition LocalTxs.cpp:184
hash_set< Book > affectedBooks(AcceptedLedgerTx const &alTx, beast::Journal const &j)
Extract the set of books affected by a transaction.
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
std::unique_ptr< NetworkOPs > makeNetworkOPs(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startValid, JobQueue &jobQueue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &ioSvc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:144
ConsensusPhase
Phases of consensus for a single ledger round.
@ Open
We haven't closed our ledger yet, but others might have.
Rate const kParityRate
A transfer rate signifying a 1:1 exchange.
json::Value getJson(LedgerFill const &fill)
Return a new json::Value representing the ledger with given options.
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:102
static std::array< char const *, 5 > const kStateNames
constexpr auto kMuldivMax
Definition mulDiv.h:8
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
ApplyFlags
Definition ApplyView.h:12
@ TapUnlimited
Definition ApplyView.h:24
@ TapFailHard
Definition ApplyView.h:17
@ TapNone
Definition ApplyView.h:13
BaseUInt< 160, detail::AccountIDTag > AccountID
A 160-bit unsigned that uniquely identifies an account.
Definition AccountID.h:28
bool isTelLocal(TER x) noexcept
Definition TER.h:639
bool cdirNext(ReadView const &view, uint256 const &root, SLE::const_pointer &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
@ temINVALID_FLAG
Definition TER.h:97
@ temBAD_SIGNATURE
Definition TER.h:91
bool isTesSuccess(TER x) noexcept
Definition TER.h:663
static std::uint32_t trunc32(std::uint64_t v)
TERSubset< CanCvtToTER > TER
Definition TER.h:634
STAmount multiply(STAmount const &amount, Number const &frac, Number::RoundingMode rm)
bool cdirFirst(ReadView const &view, uint256 const &root, SLE::const_pointer &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:50
@ TRACKING
convinced we agree with the network
Definition NetworkOPs.h:54
@ DISCONNECTED
not ready to process requests
Definition NetworkOPs.h:51
@ CONNECTED
convinced we are talking to the network
Definition NetworkOPs.h:52
@ FULL
we have the ledger and can even validate
Definition NetworkOPs.h:55
@ SYNCING
fallen slightly behind
Definition NetworkOPs.h:53
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:801
static auto const kGenesisAccountId
BaseUInt< 256 > uint256
Definition base_uint.h:562
bool isTemMalformed(TER x) noexcept
Definition TER.h:645
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j, SpendableHandling includeFullBalance=SpendableHandling::SimpleBalance)
@ tesSUCCESS
Definition TER.h:240
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
CanonicalTXSet OrderedTxs
Definition OpenLedger.h:27
detail::MultiApiJson< RPC::kApiMinimumSupportedVersion, RPC::kApiMaximumValidVersion > MultiApiJson
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
static constexpr auto kPort
Definition Constants.h:142
static constexpr auto kIp
Definition Constants.h:113
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:67
std::string serialized
The manifest in serialized form.
Definition Manifest.h:64
Blob getMasterSignature() const
Returns manifest master key signature.
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:79
std::optional< Blob > getSignature() const
Returns manifest signature.
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:73
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:76
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge fullTransitions
beast::insight::Gauge disconnectedTransitions
beast::insight::Gauge connectedDuration
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge trackingTransitions
beast::insight::Gauge fullDuration
beast::insight::Gauge syncingDuration
beast::insight::Gauge disconnectedDuration
beast::insight::Gauge connectedTransitions
beast::insight::Gauge trackingDuration
beast::insight::Hook hook
beast::insight::Gauge syncingTransitions
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index
std::shared_ptr< SubAccountHistoryIndex > index
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:112
Represents a transfer rate.
Definition Rate.h:20
Data format for exchanging consumption information across peers.
Definition Gossip.h:11
std::vector< Item > items
Definition Gossip.h:23
static constexpr auto kPortGrpc
Definition Constants.h:44
Sends a message to all peers.
Definition predicates.h:12
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:144
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)