rippled
Loading...
Searching...
No Matches
NetworkOPs.cpp
1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLCxPeerPos.h>
3#include <xrpld/app/consensus/RCLValidations.h>
4#include <xrpld/app/ledger/AcceptedLedger.h>
5#include <xrpld/app/ledger/InboundLedgers.h>
6#include <xrpld/app/ledger/LedgerMaster.h>
7#include <xrpld/app/ledger/LedgerToJson.h>
8#include <xrpld/app/ledger/LocalTxs.h>
9#include <xrpld/app/ledger/OpenLedger.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/AmendmentTable.h>
14#include <xrpld/app/misc/DeliverMax.h>
15#include <xrpld/app/misc/LoadFeeTrack.h>
16#include <xrpld/app/misc/Transaction.h>
17#include <xrpld/app/misc/TxQ.h>
18#include <xrpld/app/misc/ValidatorKeys.h>
19#include <xrpld/app/misc/ValidatorList.h>
20#include <xrpld/app/misc/detail/AccountTxPaging.h>
21#include <xrpld/app/misc/make_NetworkOPs.h>
22#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
23#include <xrpld/app/tx/apply.h>
24#include <xrpld/consensus/Consensus.h>
25#include <xrpld/consensus/ConsensusParms.h>
26#include <xrpld/overlay/Cluster.h>
27#include <xrpld/overlay/Overlay.h>
28#include <xrpld/overlay/predicates.h>
29#include <xrpld/rpc/BookChanges.h>
30#include <xrpld/rpc/CTID.h>
31#include <xrpld/rpc/DeliveredAmount.h>
32#include <xrpld/rpc/MPTokenIssuanceID.h>
33#include <xrpld/rpc/ServerHandler.h>
34
35#include <xrpl/basics/UptimeClock.h>
36#include <xrpl/basics/mulDiv.h>
37#include <xrpl/basics/safe_cast.h>
38#include <xrpl/basics/scope.h>
39#include <xrpl/beast/utility/rngfill.h>
40#include <xrpl/core/HashRouter.h>
41#include <xrpl/core/PerfLog.h>
42#include <xrpl/crypto/RFC1751.h>
43#include <xrpl/crypto/csprng.h>
44#include <xrpl/ledger/OrderBookDB.h>
45#include <xrpl/protocol/BuildInfo.h>
46#include <xrpl/protocol/Feature.h>
47#include <xrpl/protocol/MultiApiJson.h>
48#include <xrpl/protocol/NFTSyntheticSerializer.h>
49#include <xrpl/protocol/RPCErr.h>
50#include <xrpl/protocol/TxFlags.h>
51#include <xrpl/protocol/jss.h>
52#include <xrpl/resource/Fees.h>
53#include <xrpl/resource/ResourceManager.h>
54
55#include <boost/asio/ip/host_name.hpp>
56#include <boost/asio/steady_timer.hpp>
57
58#include <algorithm>
59#include <exception>
60#include <mutex>
61#include <optional>
62#include <set>
63#include <sstream>
64#include <string>
65#include <tuple>
66#include <unordered_map>
67
68namespace xrpl {
69
70class NetworkOPsImp final : public NetworkOPs
71{
77 {
78 public:
80 bool const admin;
81 bool const local;
83 bool applied = false;
85
87 : transaction(t), admin(a), local(l), failType(f)
88 {
89 XRPL_ASSERT(
91 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
92 "valid inputs");
93 }
94 };
95
99 enum class DispatchState : unsigned char {
100 none,
101 scheduled,
102 running,
103 };
104
106
122 {
130
134 std::chrono::steady_clock::time_point start_ = std::chrono::steady_clock::now();
135 std::chrono::steady_clock::time_point const processStart_ = start_;
138
139 public:
141 {
142 counters_[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions = 1;
143 }
144
151 void
153
159 void
160 json(Json::Value& obj) const;
161
163 {
165 decltype(mode_) mode;
166 decltype(start_) start;
168 };
169
172 {
175 }
176 };
177
180 {
181 ServerFeeSummary() = default;
182
183 ServerFeeSummary(XRPAmount fee, TxQ::Metrics&& escalationMetrics, LoadFeeTrack const& loadFeeTrack);
184 bool
185 operator!=(ServerFeeSummary const& b) const;
186
187 bool
189 {
190 return !(*this != b);
191 }
192
197 };
198
199public:
201 ServiceRegistry& registry,
203 bool standalone,
204 std::size_t minPeerCount,
205 bool start_valid,
206 JobQueue& job_queue,
208 ValidatorKeys const& validatorKeys,
209 boost::asio::io_context& io_svc,
210 beast::Journal journal,
211 beast::insight::Collector::ptr const& collector)
212 : registry_(registry)
213 , m_journal(journal)
216 , heartbeatTimer_(io_svc)
217 , clusterTimer_(io_svc)
218 , accountHistoryTxTimer_(io_svc)
219 , mConsensus(
220 registry_.app(),
222 setup_FeeVote(registry_.app().config().section("voting")),
223 registry_.logs().journal("FeeVote")),
225 *m_localTX,
226 registry.getInboundTransactions(),
227 beast::get_abstract_clock<std::chrono::steady_clock>(),
228 validatorKeys,
229 registry_.logs().journal("LedgerConsensus"))
230 , validatorPK_(validatorKeys.keys ? validatorKeys.keys->publicKey : decltype(validatorPK_){})
231 , validatorMasterPK_(validatorKeys.keys ? validatorKeys.keys->masterPublicKey : decltype(validatorMasterPK_){})
233 , m_job_queue(job_queue)
234 , m_standalone(standalone)
235 , minPeerCount_(start_valid ? 0 : minPeerCount)
236 , m_stats(std::bind(&NetworkOPsImp::collect_metrics, this), collector)
237 {
238 }
239
240 ~NetworkOPsImp() override
241 {
242 // This clear() is necessary to ensure the shared_ptrs in this map get
243 // destroyed NOW because the objects in this map invoke methods on this
244 // class when they are destroyed
246 }
247
248public:
250 getOperatingMode() const override;
251
253 strOperatingMode(OperatingMode const mode, bool const admin) const override;
254
256 strOperatingMode(bool const admin = false) const override;
257
258 //
259 // Transaction operations.
260 //
261
262 // Must complete immediately.
263 void
265
266 void
267 processTransaction(std::shared_ptr<Transaction>& transaction, bool bUnlimited, bool bLocal, FailHard failType)
268 override;
269
270 void
271 processTransactionSet(CanonicalTXSet const& set) override;
272
281 void
282 doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType);
283
293 void
294 doTransactionAsync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failtype);
295
296private:
297 bool
299
300 void
303 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
304
305public:
309 void
311
317 void
319
320 //
321 // Owner functions.
322 //
323
325 getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account) override;
326
327 //
328 // Book functions.
329 //
330
331 void
334 Book const&,
335 AccountID const& uTakerID,
336 bool const bProof,
337 unsigned int iLimit,
338 Json::Value const& jvMarker,
339 Json::Value& jvResult) override;
340
341 // Ledger proposal/close functions.
342 bool
344
345 bool
346 recvValidation(std::shared_ptr<STValidation> const& val, std::string const& source) override;
347
348 void
349 mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire) override;
350
351 // Network state machine.
352
353 // Used for the "jump" case.
354private:
355 void
357 bool
359
360public:
361 bool
362 beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog) override;
363 void
365 void
366 setStandAlone() override;
367
371 void
372 setStateTimer() override;
373
374 void
375 setNeedNetworkLedger() override;
376 void
377 clearNeedNetworkLedger() override;
378 bool
379 isNeedNetworkLedger() override;
380 bool
381 isFull() override;
382
383 void
384 setMode(OperatingMode om) override;
385
386 bool
387 isBlocked() override;
388 bool
389 isAmendmentBlocked() override;
390 void
391 setAmendmentBlocked() override;
392 bool
393 isAmendmentWarned() override;
394 void
395 setAmendmentWarned() override;
396 void
397 clearAmendmentWarned() override;
398 bool
399 isUNLBlocked() override;
400 void
401 setUNLBlocked() override;
402 void
403 clearUNLBlocked() override;
404 void
405 consensusViewChange() override;
406
408 getConsensusInfo() override;
410 getServerInfo(bool human, bool admin, bool counters) override;
411 void
412 clearLedgerFetch() override;
414 getLedgerFetchInfo() override;
417 void
418 reportFeeChange() override;
419 void
421
422 void
423 updateLocalTx(ReadView const& view) override;
425 getLocalTxCount() override;
426
427 //
428 // Monitoring: publisher side.
429 //
430 void
431 pubLedger(std::shared_ptr<ReadView const> const& lpAccepted) override;
432 void
435 std::shared_ptr<STTx const> const& transaction,
436 TER result) override;
437 void
438 pubValidation(std::shared_ptr<STValidation> const& val) override;
439
440 //--------------------------------------------------------------------------
441 //
442 // InfoSub::Source.
443 //
444 void
445 subAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
446 void
447 unsubAccount(InfoSub::ref ispListener, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
448
449 // Just remove the subscription from the tracking
450 // not from the InfoSub. Needed for InfoSub destruction
451 void
452 unsubAccountInternal(std::uint64_t seq, hash_set<AccountID> const& vnaAccountIDs, bool rt) override;
453
455 subAccountHistory(InfoSub::ref ispListener, AccountID const& account) override;
456 void
457 unsubAccountHistory(InfoSub::ref ispListener, AccountID const& account, bool historyOnly) override;
458
459 void
460 unsubAccountHistoryInternal(std::uint64_t seq, AccountID const& account, bool historyOnly) override;
461
462 bool
463 subLedger(InfoSub::ref ispListener, Json::Value& jvResult) override;
464 bool
465 unsubLedger(std::uint64_t uListener) override;
466
467 bool
468 subBookChanges(InfoSub::ref ispListener) override;
469 bool
470 unsubBookChanges(std::uint64_t uListener) override;
471
472 bool
473 subServer(InfoSub::ref ispListener, Json::Value& jvResult, bool admin) override;
474 bool
475 unsubServer(std::uint64_t uListener) override;
476
477 bool
478 subBook(InfoSub::ref ispListener, Book const&) override;
479 bool
480 unsubBook(std::uint64_t uListener, Book const&) override;
481
482 bool
483 subManifests(InfoSub::ref ispListener) override;
484 bool
485 unsubManifests(std::uint64_t uListener) override;
486 void
487 pubManifest(Manifest const&) override;
488
489 bool
490 subTransactions(InfoSub::ref ispListener) override;
491 bool
492 unsubTransactions(std::uint64_t uListener) override;
493
494 bool
495 subRTTransactions(InfoSub::ref ispListener) override;
496 bool
497 unsubRTTransactions(std::uint64_t uListener) override;
498
499 bool
500 subValidations(InfoSub::ref ispListener) override;
501 bool
502 unsubValidations(std::uint64_t uListener) override;
503
504 bool
505 subPeerStatus(InfoSub::ref ispListener) override;
506 bool
507 unsubPeerStatus(std::uint64_t uListener) override;
508 void
509 pubPeerStatus(std::function<Json::Value(void)> const&) override;
510
511 bool
512 subConsensus(InfoSub::ref ispListener) override;
513 bool
514 unsubConsensus(std::uint64_t uListener) override;
515
517 findRpcSub(std::string const& strUrl) override;
519 addRpcSub(std::string const& strUrl, InfoSub::ref) override;
520 bool
521 tryRemoveRpcSub(std::string const& strUrl) override;
522
523 void
524 stop() override
525 {
526 {
527 try
528 {
529 heartbeatTimer_.cancel();
530 }
531 catch (boost::system::system_error const& e)
532 {
533 JLOG(m_journal.error()) << "NetworkOPs: heartbeatTimer cancel error: " << e.what();
534 }
535
536 try
537 {
538 clusterTimer_.cancel();
539 }
540 catch (boost::system::system_error const& e)
541 {
542 JLOG(m_journal.error()) << "NetworkOPs: clusterTimer cancel error: " << e.what();
543 }
544
545 try
546 {
547 accountHistoryTxTimer_.cancel();
548 }
549 catch (boost::system::system_error const& e)
550 {
551 JLOG(m_journal.error()) << "NetworkOPs: accountHistoryTxTimer cancel error: " << e.what();
552 }
553 }
554 // Make sure that any waitHandlers pending in our timers are done.
555 using namespace std::chrono_literals;
556 waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
557 }
558
559 void
560 stateAccounting(Json::Value& obj) override;
561
562private:
563 void
564 setTimer(
565 boost::asio::steady_timer& timer,
566 std::chrono::milliseconds const& expiry_time,
567 std::function<void()> onExpire,
568 std::function<void()> onError);
569 void
571 void
573 void
575 void
577
579 transJson(
580 std::shared_ptr<STTx const> const& transaction,
581 TER result,
582 bool validated,
585
586 void
589 AcceptedLedgerTx const& transaction,
590 bool last);
591
592 void
595 AcceptedLedgerTx const& transaction,
596 bool last);
597
598 void
601 std::shared_ptr<STTx const> const& transaction,
602 TER result);
603
604 void
605 pubServer();
606 void
608
610 getHostId(bool forAdmin);
611
612private:
616
617 /*
618 * With a validated ledger to separate history and future, the node
619 * streams historical txns with negative indexes starting from -1,
620 * and streams future txns starting from index 0.
621 * The SubAccountHistoryIndex struct maintains these indexes.
622 * It also has a flag stopHistorical_ for stopping streaming
623 * the historical txns.
624 */
660
664 void
666 void
668 void
670
673
675
677
679
684
686 boost::asio::steady_timer heartbeatTimer_;
687 boost::asio::steady_timer clusterTimer_;
688 boost::asio::steady_timer accountHistoryTxTimer_;
689
691
694
696
698
701
703
705
706 enum SubTypes {
707 sLedger, // Accepted ledgers.
708 sManifests, // Received validator manifests.
709 sServer, // When server changes connectivity state.
710 sTransactions, // All accepted transactions.
711 sRTTransactions, // All proposed and accepted transactions.
712 sValidations, // Received validations.
713 sPeerStatus, // Peer status changes.
714 sConsensusPhase, // Consensus phase
715 sBookChanges, // Per-ledger order book changes
716 sLastEntry // Any new entry must be ADDED ABOVE this one
717 };
718
720
722
724
725 // Whether we are in standalone mode.
726 bool const m_standalone;
727
728 // The number of nodes that we need to consider ourselves connected.
730
731 // Transaction batching.
736
738
741
742private:
743 struct Stats
744 {
745 template <class Handler>
746 Stats(Handler const& handler, beast::insight::Collector::ptr const& collector)
747 : hook(collector->make_hook(handler))
748 , disconnected_duration(collector->make_gauge("State_Accounting", "Disconnected_duration"))
749 , connected_duration(collector->make_gauge("State_Accounting", "Connected_duration"))
750 , syncing_duration(collector->make_gauge("State_Accounting", "Syncing_duration"))
751 , tracking_duration(collector->make_gauge("State_Accounting", "Tracking_duration"))
752 , full_duration(collector->make_gauge("State_Accounting", "Full_duration"))
753 , disconnected_transitions(collector->make_gauge("State_Accounting", "Disconnected_transitions"))
754 , connected_transitions(collector->make_gauge("State_Accounting", "Connected_transitions"))
755 , syncing_transitions(collector->make_gauge("State_Accounting", "Syncing_transitions"))
756 , tracking_transitions(collector->make_gauge("State_Accounting", "Tracking_transitions"))
757 , full_transitions(collector->make_gauge("State_Accounting", "Full_transitions"))
758 {
759 }
760
767
773 };
774
775 std::mutex m_statsMutex; // Mutex to lock m_stats
777
778private:
779 void
781};
782
783//------------------------------------------------------------------------------
784
785static std::array<char const*, 5> const stateNames{{"disconnected", "connected", "syncing", "tracking", "full"}};
786
788
795
796static auto const genesisAccountId =
798
799//------------------------------------------------------------------------------
800inline OperatingMode
802{
803 return mMode;
804}
805
806inline std::string
807NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
808{
809 return strOperatingMode(mMode, admin);
810}
811
812inline void
817
818inline void
823
824inline void
829
830inline bool
835
836inline bool
841
844{
845 static std::string const hostname = boost::asio::ip::host_name();
846
847 if (forAdmin)
848 return hostname;
849
850 // For non-admin uses hash the node public key into a
851 // single RFC1751 word:
852 static std::string const shroudedHostId = [this]() {
853 auto const& id = registry_.app().nodeIdentity();
854
855 return RFC1751::getWordFromBlob(id.first.data(), id.first.size());
856 }();
857
858 return shroudedHostId;
859}
860
861void
863{
865
866 // Only do this work if a cluster is configured
867 if (registry_.cluster().size() != 0)
869}
870
871void
873 boost::asio::steady_timer& timer,
874 std::chrono::milliseconds const& expiry_time,
875 std::function<void()> onExpire,
876 std::function<void()> onError)
877{
878 // Only start the timer if waitHandlerCounter_ is not yet joined.
879 if (auto optionalCountedHandler =
880 waitHandlerCounter_.wrap([this, onExpire, onError](boost::system::error_code const& e) {
881 if ((e.value() == boost::system::errc::success) && (!m_job_queue.isStopped()))
882 {
883 onExpire();
884 }
885 // Recover as best we can if an unexpected error occurs.
886 if (e.value() != boost::system::errc::success && e.value() != boost::asio::error::operation_aborted)
887 {
888 // Try again later and hope for the best.
889 JLOG(m_journal.error()) << "Timer got error '" << e.message() << "'. Restarting timer.";
890 onError();
891 }
892 }))
893 {
894 timer.expires_after(expiry_time);
895 timer.async_wait(std::move(*optionalCountedHandler));
896 }
897}
898
899void
900NetworkOPsImp::setHeartbeatTimer()
901{
902 setTimer(
903 heartbeatTimer_,
904 mConsensus.parms().ledgerGRANULARITY,
905 [this]() { m_job_queue.addJob(jtNETOP_TIMER, "NetHeart", [this]() { processHeartbeatTimer(); }); },
906 [this]() { setHeartbeatTimer(); });
907}
908
909void
910NetworkOPsImp::setClusterTimer()
911{
912 using namespace std::chrono_literals;
913
914 setTimer(
915 clusterTimer_,
916 10s,
917 [this]() { m_job_queue.addJob(jtNETOP_CLUSTER, "NetCluster", [this]() { processClusterTimer(); }); },
918 [this]() { setClusterTimer(); });
919}
920
921void
922NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
923{
924 JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account " << toBase58(subInfo.index_->accountId_);
925 using namespace std::chrono_literals;
926 setTimer(
927 accountHistoryTxTimer_,
928 4s,
929 [this, subInfo]() { addAccountHistoryJob(subInfo); },
930 [this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
931}
932
933void
934NetworkOPsImp::processHeartbeatTimer()
935{
936 RclConsensusLogger clog("Heartbeat Timer", mConsensus.validating(), m_journal);
937 {
938 std::unique_lock lock{registry_.app().getMasterMutex()};
939
940 // VFALCO NOTE This is for diagnosing a crash on exit
941 LoadManager& mgr(registry_.getLoadManager());
942 mgr.heartbeat();
943
944 std::size_t const numPeers = registry_.overlay().size();
945
946 // do we have sufficient peers? If not, we are disconnected.
947 if (numPeers < minPeerCount_)
948 {
949 if (mMode != OperatingMode::DISCONNECTED)
950 {
951 setMode(OperatingMode::DISCONNECTED);
953 ss << "Node count (" << numPeers << ") has fallen "
954 << "below required minimum (" << minPeerCount_ << ").";
955 JLOG(m_journal.warn()) << ss.str();
956 CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
957 }
958 else
959 {
960 CLOG(clog.ss()) << "already DISCONNECTED. too few peers (" << numPeers << "), need at least "
961 << minPeerCount_;
962 }
963
964 // MasterMutex lock need not be held to call setHeartbeatTimer()
965 lock.unlock();
966 // We do not call mConsensus.timerEntry until there are enough
967 // peers providing meaningful inputs to consensus
968 setHeartbeatTimer();
969
970 return;
971 }
972
973 if (mMode == OperatingMode::DISCONNECTED)
974 {
975 setMode(OperatingMode::CONNECTED);
976 JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
977 CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
978 }
979
980 // Check if the last validated ledger forces a change between these
981 // states.
982 auto origMode = mMode.load();
983 CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
984 if (mMode == OperatingMode::SYNCING)
985 setMode(OperatingMode::SYNCING);
986 else if (mMode == OperatingMode::CONNECTED)
987 setMode(OperatingMode::CONNECTED);
988 auto newMode = mMode.load();
989 if (origMode != newMode)
990 {
991 CLOG(clog.ss()) << ", changing to " << strOperatingMode(newMode, true);
992 }
993 CLOG(clog.ss()) << ". ";
994 }
995
996 mConsensus.timerEntry(registry_.timeKeeper().closeTime(), clog.ss());
997
998 CLOG(clog.ss()) << "consensus phase " << to_string(mLastConsensusPhase);
999 ConsensusPhase const currPhase = mConsensus.phase();
1000 if (mLastConsensusPhase != currPhase)
1001 {
1002 reportConsensusStateChange(currPhase);
1003 mLastConsensusPhase = currPhase;
1004 CLOG(clog.ss()) << " changed to " << to_string(mLastConsensusPhase);
1005 }
1006 CLOG(clog.ss()) << ". ";
1007
1008 setHeartbeatTimer();
1009}
1010
1011void
1012NetworkOPsImp::processClusterTimer()
1013{
1014 if (registry_.cluster().size() == 0)
1015 return;
1016
1017 using namespace std::chrono_literals;
1018
1019 bool const update = registry_.cluster().update(
1020 registry_.app().nodeIdentity().first,
1021 "",
1022 (m_ledgerMaster.getValidatedLedgerAge() <= 4min) ? registry_.getFeeTrack().getLocalFee() : 0,
1023 registry_.timeKeeper().now());
1024
1025 if (!update)
1026 {
1027 JLOG(m_journal.debug()) << "Too soon to send cluster update";
1028 setClusterTimer();
1029 return;
1030 }
1031
1032 protocol::TMCluster cluster;
1033 registry_.cluster().for_each([&cluster](ClusterNode const& node) {
1034 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1035 n.set_publickey(toBase58(TokenType::NodePublic, node.identity()));
1036 n.set_reporttime(node.getReportTime().time_since_epoch().count());
1037 n.set_nodeload(node.getLoadFee());
1038 if (!node.name().empty())
1039 n.set_nodename(node.name());
1040 });
1041
1042 Resource::Gossip gossip = registry_.getResourceManager().exportConsumers();
1043 for (auto& item : gossip.items)
1044 {
1045 protocol::TMLoadSource& node = *cluster.add_loadsources();
1046 node.set_name(to_string(item.address));
1047 node.set_cost(item.balance);
1048 }
1049 registry_.overlay().foreach(send_if(std::make_shared<Message>(cluster, protocol::mtCLUSTER), peer_in_cluster()));
1050 setClusterTimer();
1051}
1052
1053//------------------------------------------------------------------------------
1054
1056NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin) const
1057{
1058 if (mode == OperatingMode::FULL && admin)
1059 {
1060 auto const consensusMode = mConsensus.mode();
1061 if (consensusMode != ConsensusMode::wrongLedger)
1062 {
1063 if (consensusMode == ConsensusMode::proposing)
1064 return "proposing";
1065
1066 if (mConsensus.validating())
1067 return "validating";
1068 }
1069 }
1070
1071 return states_[static_cast<std::size_t>(mode)];
1072}
1073
1074void
1075NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
1076{
1077 if (isNeedNetworkLedger())
1078 {
1079 // Nothing we can do if we've never been in sync
1080 return;
1081 }
1082
1083 // Enforce Network bar for batch txn
1084 if (iTrans->isFlag(tfInnerBatchTxn) && m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1085 {
1086 JLOG(m_journal.error()) << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
1087 return;
1088 }
1089
1090 // this is an asynchronous interface
1091 auto const trans = sterilize(*iTrans);
1092
1093 auto const txid = trans->getTransactionID();
1094 auto const flags = registry_.getHashRouter().getFlags(txid);
1095
1096 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1097 {
1098 JLOG(m_journal.warn()) << "Submitted transaction cached bad";
1099 return;
1100 }
1101
1102 try
1103 {
1104 auto const [validity, reason] = checkValidity(
1105 registry_.getHashRouter(), *trans, m_ledgerMaster.getValidatedRules(), registry_.app().config());
1106
1107 if (validity != Validity::Valid)
1108 {
1109 JLOG(m_journal.warn()) << "Submitted transaction invalid: " << reason;
1110 return;
1111 }
1112 }
1113 catch (std::exception const& ex)
1114 {
1115 JLOG(m_journal.warn()) << "Exception checking transaction " << txid << ": " << ex.what();
1116
1117 return;
1118 }
1119
1120 std::string reason;
1121
1122 auto tx = std::make_shared<Transaction>(trans, reason, registry_.app());
1123
1124 m_job_queue.addJob(jtTRANSACTION, "SubmitTxn", [this, tx]() {
1125 auto t = tx;
1126 processTransaction(t, false, false, FailHard::no);
1127 });
1128}
1129
1130bool
1131NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
1132{
1133 auto const newFlags = registry_.getHashRouter().getFlags(transaction->getID());
1134
1135 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1136 {
1137 // cached bad
1138 JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
1139 transaction->setStatus(INVALID);
1140 transaction->setResult(temBAD_SIGNATURE);
1141 return false;
1142 }
1143
1144 auto const view = m_ledgerMaster.getCurrentLedger();
1145
1146 // This function is called by several different parts of the codebase
1147 // under no circumstances will we ever accept an inner txn within a batch
1148 // txn from the network.
1149 auto const sttx = *transaction->getSTransaction();
1150 if (sttx.isFlag(tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1151 {
1152 transaction->setStatus(INVALID);
1153 transaction->setResult(temINVALID_FLAG);
1154 registry_.getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1155 return false;
1156 }
1157
1158 // NOTE ximinez - I think this check is redundant,
1159 // but I'm not 100% sure yet.
1160 // If so, only cost is looking up HashRouter flags.
1161 auto const [validity, reason] =
1162 checkValidity(registry_.getHashRouter(), sttx, view->rules(), registry_.app().config());
1163 XRPL_ASSERT(validity == Validity::Valid, "xrpl::NetworkOPsImp::processTransaction : valid validity");
1164
1165 // Not concerned with local checks at this point.
1166 if (validity == Validity::SigBad)
1167 {
1168 JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
1169 transaction->setStatus(INVALID);
1170 transaction->setResult(temBAD_SIGNATURE);
1171 registry_.getHashRouter().setFlags(transaction->getID(), HashRouterFlags::BAD);
1172 return false;
1173 }
1174
1175 // canonicalize can change our pointer
1176 registry_.getMasterTransaction().canonicalize(&transaction);
1177
1178 return true;
1179}
1180
1181void
1182NetworkOPsImp::processTransaction(
1183 std::shared_ptr<Transaction>& transaction,
1184 bool bUnlimited,
1185 bool bLocal,
1186 FailHard failType)
1187{
1188 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
1189
1190 // preProcessTransaction can change our pointer
1191 if (!preProcessTransaction(transaction))
1192 return;
1193
1194 if (bLocal)
1195 doTransactionSync(transaction, bUnlimited, failType);
1196 else
1197 doTransactionAsync(transaction, bUnlimited, failType);
1198}
1199
1200void
1201NetworkOPsImp::doTransactionAsync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType)
1202{
1203 std::lock_guard lock(mMutex);
1204
1205 if (transaction->getApplying())
1206 return;
1207
1208 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, false, failType));
1209 transaction->setApplying();
1210
1211 if (mDispatchState == DispatchState::none)
1212 {
1213 if (m_job_queue.addJob(jtBATCH, "TxBatchAsync", [this]() { transactionBatch(); }))
1214 {
1215 mDispatchState = DispatchState::scheduled;
1216 }
1217 }
1218}
1219
1220void
1221NetworkOPsImp::doTransactionSync(std::shared_ptr<Transaction> transaction, bool bUnlimited, FailHard failType)
1222{
1223 std::unique_lock<std::mutex> lock(mMutex);
1224
1225 if (!transaction->getApplying())
1226 {
1227 mTransactions.push_back(TransactionStatus(transaction, bUnlimited, true, failType));
1228 transaction->setApplying();
1229 }
1230
1231 doTransactionSyncBatch(
1232 lock, [&transaction](std::unique_lock<std::mutex> const&) { return transaction->getApplying(); });
1233}
1234
1235void
1236NetworkOPsImp::doTransactionSyncBatch(
1238 std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
1239{
1240 do
1241 {
1242 if (mDispatchState == DispatchState::running)
1243 {
1244 // A batch processing job is already running, so wait.
1245 mCond.wait(lock);
1246 }
1247 else
1248 {
1249 apply(lock);
1250
1251 if (mTransactions.size())
1252 {
1253 // More transactions need to be applied, but by another job.
1254 if (m_job_queue.addJob(jtBATCH, "TxBatchSync", [this]() { transactionBatch(); }))
1255 {
1256 mDispatchState = DispatchState::scheduled;
1257 }
1258 }
1259 }
1260 } while (retryCallback(lock));
1261}
1262
1263void
1264NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
1265{
1266 auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
1268 candidates.reserve(set.size());
1269 for (auto const& [_, tx] : set)
1270 {
1271 std::string reason;
1272 auto transaction = std::make_shared<Transaction>(tx, reason, registry_.app());
1273
1274 if (transaction->getStatus() == INVALID)
1275 {
1276 if (!reason.empty())
1277 {
1278 JLOG(m_journal.trace()) << "Exception checking transaction: " << reason;
1279 }
1280 registry_.getHashRouter().setFlags(tx->getTransactionID(), HashRouterFlags::BAD);
1281 continue;
1282 }
1283
1284 // preProcessTransaction can change our pointer
1285 if (!preProcessTransaction(transaction))
1286 continue;
1287
1288 candidates.emplace_back(transaction);
1289 }
1290
1291 std::vector<TransactionStatus> transactions;
1292 transactions.reserve(candidates.size());
1293
1294 std::unique_lock lock(mMutex);
1295
1296 for (auto& transaction : candidates)
1297 {
1298 if (!transaction->getApplying())
1299 {
1300 transactions.emplace_back(transaction, false, false, FailHard::no);
1301 transaction->setApplying();
1302 }
1303 }
1304
1305 if (mTransactions.empty())
1306 mTransactions.swap(transactions);
1307 else
1308 {
1309 mTransactions.reserve(mTransactions.size() + transactions.size());
1310 for (auto& t : transactions)
1311 mTransactions.push_back(std::move(t));
1312 }
1313 if (mTransactions.empty())
1314 {
1315 JLOG(m_journal.debug()) << "No transaction to process!";
1316 return;
1317 }
1318
1319 doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
1320 XRPL_ASSERT(lock.owns_lock(), "xrpl::NetworkOPsImp::processTransactionSet has lock");
1321 return std::any_of(
1322 mTransactions.begin(), mTransactions.end(), [](auto const& t) { return t.transaction->getApplying(); });
1323 });
1324}
1325
1326void
1327NetworkOPsImp::transactionBatch()
1328{
1329 std::unique_lock<std::mutex> lock(mMutex);
1330
1331 if (mDispatchState == DispatchState::running)
1332 return;
1333
1334 while (mTransactions.size())
1335 {
1336 apply(lock);
1337 }
1338}
1339
1340void
1341NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
1342{
1344 std::vector<TransactionStatus> transactions;
1345 mTransactions.swap(transactions);
1346 XRPL_ASSERT(!transactions.empty(), "xrpl::NetworkOPsImp::apply : non-empty transactions");
1347 XRPL_ASSERT(mDispatchState != DispatchState::running, "xrpl::NetworkOPsImp::apply : is not running");
1348
1349 mDispatchState = DispatchState::running;
1350
1351 batchLock.unlock();
1352
1353 {
1354 std::unique_lock masterLock{registry_.app().getMasterMutex(), std::defer_lock};
1355 bool changed = false;
1356 {
1357 std::unique_lock ledgerLock{m_ledgerMaster.peekMutex(), std::defer_lock};
1358 std::lock(masterLock, ledgerLock);
1359
1360 registry_.openLedger().modify([&](OpenView& view, beast::Journal j) {
1361 for (TransactionStatus& e : transactions)
1362 {
1363 // we check before adding to the batch
1364 ApplyFlags flags = tapNONE;
1365 if (e.admin)
1366 flags |= tapUNLIMITED;
1367
1368 if (e.failType == FailHard::yes)
1369 flags |= tapFAIL_HARD;
1370
1371 auto const result =
1372 registry_.getTxQ().apply(registry_.app(), view, e.transaction->getSTransaction(), flags, j);
1373 e.result = result.ter;
1374 e.applied = result.applied;
1375 changed = changed || result.applied;
1376 }
1377 return changed;
1378 });
1379 }
1380 if (changed)
1381 reportFeeChange();
1382
1383 std::optional<LedgerIndex> validatedLedgerIndex;
1384 if (auto const l = m_ledgerMaster.getValidatedLedger())
1385 validatedLedgerIndex = l->header().seq;
1386
1387 auto newOL = registry_.openLedger().current();
1388 for (TransactionStatus& e : transactions)
1389 {
1390 e.transaction->clearSubmitResult();
1391
1392 if (e.applied)
1393 {
1394 pubProposedTransaction(newOL, e.transaction->getSTransaction(), e.result);
1395 e.transaction->setApplied();
1396 }
1397
1398 e.transaction->setResult(e.result);
1399
1400 if (isTemMalformed(e.result))
1401 registry_.getHashRouter().setFlags(e.transaction->getID(), HashRouterFlags::BAD);
1402
1403#ifdef DEBUG
1404 if (e.result != tesSUCCESS)
1405 {
1406 std::string token, human;
1407
1408 if (transResultInfo(e.result, token, human))
1409 {
1410 JLOG(m_journal.info()) << "TransactionResult: " << token << ": " << human;
1411 }
1412 }
1413#endif
1414
1415 bool addLocal = e.local;
1416
1417 if (e.result == tesSUCCESS)
1418 {
1419 JLOG(m_journal.debug()) << "Transaction is now included in open ledger";
1420 e.transaction->setStatus(INCLUDED);
1421
1422 // Pop as many "reasonable" transactions for this account as
1423 // possible. "Reasonable" means they have sequential sequence
1424 // numbers, or use tickets.
1425 auto const& txCur = e.transaction->getSTransaction();
1426
1427 std::size_t count = 0;
1428 for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur); txNext && count < maxPoppedTransactions;
1429 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1430 {
1431 if (!batchLock.owns_lock())
1432 batchLock.lock();
1433 std::string reason;
1434 auto const trans = sterilize(*txNext);
1435 auto t = std::make_shared<Transaction>(trans, reason, registry_.app());
1436 if (t->getApplying())
1437 break;
1438 submit_held.emplace_back(t, false, false, FailHard::no);
1439 t->setApplying();
1440 }
1441 if (batchLock.owns_lock())
1442 batchLock.unlock();
1443 }
1444 else if (e.result == tefPAST_SEQ)
1445 {
1446 // duplicate or conflict
1447 JLOG(m_journal.info()) << "Transaction is obsolete";
1448 e.transaction->setStatus(OBSOLETE);
1449 }
1450 else if (e.result == terQUEUED)
1451 {
1452 JLOG(m_journal.debug()) << "Transaction is likely to claim a"
1453 << " fee, but is queued until fee drops";
1454
1455 e.transaction->setStatus(HELD);
1456 // Add to held transactions, because it could get
1457 // kicked out of the queue, and this will try to
1458 // put it back.
1459 m_ledgerMaster.addHeldTransaction(e.transaction);
1460 e.transaction->setQueued();
1461 e.transaction->setKept();
1462 }
1463 else if (isTerRetry(e.result) || isTelLocal(e.result) || isTefFailure(e.result))
1464 {
1465 if (e.failType != FailHard::yes)
1466 {
1467 auto const lastLedgerSeq = e.transaction->getSTransaction()->at(~sfLastLedgerSequence);
1468 auto const ledgersLeft = lastLedgerSeq ? *lastLedgerSeq - m_ledgerMaster.getCurrentLedgerIndex()
1470 // If any of these conditions are met, the transaction can
1471 // be held:
1472 // 1. It was submitted locally. (Note that this flag is only
1473 // true on the initial submission.)
1474 // 2. The transaction has a LastLedgerSequence, and the
1475 // LastLedgerSequence is fewer than LocalTxs::holdLedgers
1476 // (5) ledgers into the future. (Remember that an
1477 // unseated optional compares as less than all seated
1478 // values, so it has to be checked explicitly first.)
1479 // 3. The HashRouterFlags::BAD flag is not set on the txID.
1480 // (setFlags
1481 // checks before setting. If the flag is set, it returns
1482 // false, which means it's been held once without one of
1483 // the other conditions, so don't hold it again. Time's
1484 // up!)
1485 //
1486 if (e.local || (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1487 registry_.getHashRouter().setFlags(e.transaction->getID(), HashRouterFlags::HELD))
1488 {
1489 // transaction should be held
1490 JLOG(m_journal.debug()) << "Transaction should be held: " << e.result;
1491 e.transaction->setStatus(HELD);
1492 m_ledgerMaster.addHeldTransaction(e.transaction);
1493 e.transaction->setKept();
1494 }
1495 else
1496 JLOG(m_journal.debug())
1497 << "Not holding transaction " << e.transaction->getID() << ": "
1498 << (e.local ? "local" : "network") << ", "
1499 << "result: " << e.result
1500 << " ledgers left: " << (ledgersLeft ? to_string(*ledgersLeft) : "unspecified");
1501 }
1502 }
1503 else
1504 {
1505 JLOG(m_journal.debug()) << "Status other than success " << e.result;
1506 e.transaction->setStatus(INVALID);
1507 }
1508
1509 auto const enforceFailHard = e.failType == FailHard::yes && !isTesSuccess(e.result);
1510
1511 if (addLocal && !enforceFailHard)
1512 {
1513 m_localTX->push_back(m_ledgerMaster.getCurrentLedgerIndex(), e.transaction->getSTransaction());
1514 e.transaction->setKept();
1515 }
1516
1517 if ((e.applied || ((mMode != OperatingMode::FULL) && (e.failType != FailHard::yes) && e.local) ||
1518 (e.result == terQUEUED)) &&
1519 !enforceFailHard)
1520 {
1521 auto const toSkip = registry_.getHashRouter().shouldRelay(e.transaction->getID());
1522 if (auto const sttx = *(e.transaction->getSTransaction()); toSkip &&
1523 // Skip relaying if it's an inner batch txn. The flag should
1524 // only be set if the Batch feature is enabled. If Batch is
1525 // not enabled, the flag is always invalid, so don't relay
1526 // it regardless.
1527 !(sttx.isFlag(tfInnerBatchTxn)))
1528 {
1529 protocol::TMTransaction tx;
1530 Serializer s;
1531
1532 sttx.add(s);
1533 tx.set_rawtransaction(s.data(), s.size());
1534 tx.set_status(protocol::tsCURRENT);
1535 tx.set_receivetimestamp(registry_.timeKeeper().now().time_since_epoch().count());
1536 tx.set_deferred(e.result == terQUEUED);
1537 // FIXME: This should be when we received it
1538 registry_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1539 e.transaction->setBroadcast();
1540 }
1541 }
1542
1543 if (validatedLedgerIndex)
1544 {
1545 auto [fee, accountSeq, availableSeq] =
1546 registry_.getTxQ().getTxRequiredFeeAndSeq(*newOL, e.transaction->getSTransaction());
1547 e.transaction->setCurrentLedgerState(*validatedLedgerIndex, fee, accountSeq, availableSeq);
1548 }
1549 }
1550 }
1551
1552 batchLock.lock();
1553
1554 for (TransactionStatus& e : transactions)
1555 e.transaction->clearApplying();
1556
1557 if (!submit_held.empty())
1558 {
1559 if (mTransactions.empty())
1560 mTransactions.swap(submit_held);
1561 else
1562 {
1563 mTransactions.reserve(mTransactions.size() + submit_held.size());
1564 for (auto& e : submit_held)
1565 mTransactions.push_back(std::move(e));
1566 }
1567 }
1568
1569 mCond.notify_all();
1570
1571 mDispatchState = DispatchState::none;
1572}
1573
1574//
1575// Owner functions
1576//
1577
1579NetworkOPsImp::getOwnerInfo(std::shared_ptr<ReadView const> lpLedger, AccountID const& account)
1580{
1581 Json::Value jvObjects(Json::objectValue);
1582 auto root = keylet::ownerDir(account);
1583 auto sleNode = lpLedger->read(keylet::page(root));
1584 if (sleNode)
1585 {
1586 std::uint64_t uNodeDir;
1587
1588 do
1589 {
1590 for (auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1591 {
1592 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1593 XRPL_ASSERT(sleCur, "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1594
1595 switch (sleCur->getType())
1596 {
1597 case ltOFFER:
1598 if (!jvObjects.isMember(jss::offers))
1599 jvObjects[jss::offers] = Json::Value(Json::arrayValue);
1600
1601 jvObjects[jss::offers].append(sleCur->getJson(JsonOptions::none));
1602 break;
1603
1604 case ltRIPPLE_STATE:
1605 if (!jvObjects.isMember(jss::ripple_lines))
1606 {
1607 jvObjects[jss::ripple_lines] = Json::Value(Json::arrayValue);
1608 }
1609
1610 jvObjects[jss::ripple_lines].append(sleCur->getJson(JsonOptions::none));
1611 break;
1612
1613 case ltACCOUNT_ROOT:
1614 case ltDIR_NODE:
1615 // LCOV_EXCL_START
1616 default:
1617 UNREACHABLE(
1618 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1619 "type");
1620 break;
1621 // LCOV_EXCL_STOP
1622 }
1623 }
1624
1625 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1626
1627 if (uNodeDir)
1628 {
1629 sleNode = lpLedger->read(keylet::page(root, uNodeDir));
1630 XRPL_ASSERT(sleNode, "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1631 }
1632 } while (uNodeDir);
1633 }
1634
1635 return jvObjects;
1636}
1637
1638//
1639// Other
1640//
1641
1642inline bool
1643NetworkOPsImp::isBlocked()
1644{
1645 return isAmendmentBlocked() || isUNLBlocked();
1646}
1647
1648inline bool
1649NetworkOPsImp::isAmendmentBlocked()
1650{
1651 return amendmentBlocked_;
1652}
1653
1654void
1655NetworkOPsImp::setAmendmentBlocked()
1656{
1657 amendmentBlocked_ = true;
1658 setMode(OperatingMode::CONNECTED);
1659}
1660
1661inline bool
1662NetworkOPsImp::isAmendmentWarned()
1663{
1664 return !amendmentBlocked_ && amendmentWarned_;
1665}
1666
1667inline void
1668NetworkOPsImp::setAmendmentWarned()
1669{
1670 amendmentWarned_ = true;
1671}
1672
1673inline void
1674NetworkOPsImp::clearAmendmentWarned()
1675{
1676 amendmentWarned_ = false;
1677}
1678
1679inline bool
1680NetworkOPsImp::isUNLBlocked()
1681{
1682 return unlBlocked_;
1683}
1684
1685void
1686NetworkOPsImp::setUNLBlocked()
1687{
1688 unlBlocked_ = true;
1689 setMode(OperatingMode::CONNECTED);
1690}
1691
1692inline void
1693NetworkOPsImp::clearUNLBlocked()
1694{
1695 unlBlocked_ = false;
1696}
1697
1698bool
1699NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint256& networkClosed)
1700{
1701 // Returns true if there's an *abnormal* ledger issue, normal changing in
1702 // TRACKING mode should return false. Do we have sufficient validations for
1703 // our last closed ledger? Or do sufficient nodes agree? And do we have no
1704 // better ledger available? If so, we are either tracking or full.
1705
1706 JLOG(m_journal.trace()) << "NetworkOPsImp::checkLastClosedLedger";
1707
1708 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1709
1710 if (!ourClosed)
1711 return false;
1712
1713 uint256 closedLedger = ourClosed->header().hash;
1714 uint256 prevClosedLedger = ourClosed->header().parentHash;
1715 JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
1716 JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
1717
1718 //-------------------------------------------------------------------------
1719 // Determine preferred last closed ledger
1720
1721 auto& validations = registry_.getValidations();
1722 JLOG(m_journal.debug()) << "ValidationTrie " << Json::Compact(validations.getJsonTrie());
1723
1724 // Will rely on peer LCL if no trusted validations exist
1726 peerCounts[closedLedger] = 0;
1727 if (mMode >= OperatingMode::TRACKING)
1728 peerCounts[closedLedger]++;
1729
1730 for (auto& peer : peerList)
1731 {
1732 uint256 peerLedger = peer->getClosedLedgerHash();
1733
1734 if (peerLedger.isNonZero())
1735 ++peerCounts[peerLedger];
1736 }
1737
1738 for (auto const& it : peerCounts)
1739 JLOG(m_journal.debug()) << "L: " << it.first << " n=" << it.second;
1740
1741 uint256 preferredLCL = validations.getPreferredLCL(
1742 RCLValidatedLedger{ourClosed, validations.adaptor().journal()},
1743 m_ledgerMaster.getValidLedgerIndex(),
1744 peerCounts);
1745
1746 bool switchLedgers = preferredLCL != closedLedger;
1747 if (switchLedgers)
1748 closedLedger = preferredLCL;
1749 //-------------------------------------------------------------------------
1750 if (switchLedgers && (closedLedger == prevClosedLedger))
1751 {
1752 // don't switch to our own previous ledger
1753 JLOG(m_journal.info()) << "We won't switch to our own previous ledger";
1754 networkClosed = ourClosed->header().hash;
1755 switchLedgers = false;
1756 }
1757 else
1758 networkClosed = closedLedger;
1759
1760 if (!switchLedgers)
1761 return false;
1762
1763 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1764
1765 if (!consensus)
1766 consensus = registry_.getInboundLedgers().acquire(closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1767
1768 if (consensus &&
1769 (!m_ledgerMaster.canBeCurrent(consensus) ||
1770 !m_ledgerMaster.isCompatible(*consensus, m_journal.debug(), "Not switching")))
1771 {
1772 // Don't switch to a ledger not on the validated chain
1773 // or with an invalid close time or sequence
1774 networkClosed = ourClosed->header().hash;
1775 return false;
1776 }
1777
1778 JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
1779 JLOG(m_journal.info()) << "Our LCL: " << ourClosed->header().hash << getJson({*ourClosed, {}});
1780 JLOG(m_journal.info()) << "Net LCL " << closedLedger;
1781
1782 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1783 {
1784 setMode(OperatingMode::CONNECTED);
1785 }
1786
1787 if (consensus)
1788 {
1789 // FIXME: If this rewinds the ledger sequence, or has the same
1790 // sequence, we should update the status on any stored transactions
1791 // in the invalidated ledgers.
1792 switchLastClosedLedger(consensus);
1793 }
1794
1795 return true;
1796}
1797
1798void
1799NetworkOPsImp::switchLastClosedLedger(std::shared_ptr<Ledger const> const& newLCL)
1800{
1801 // set the newLCL as our last closed ledger -- this is abnormal code
1802 JLOG(m_journal.error()) << "JUMP last closed ledger to " << newLCL->header().hash;
1803
1804 clearNeedNetworkLedger();
1805
1806 // Update fee computations.
1807 registry_.getTxQ().processClosedLedger(registry_.app(), *newLCL, true);
1808
1809 // Caller must own master lock
1810 {
1811 // Apply tx in old open ledger to new
1812 // open ledger. Then apply local tx.
1813
1814 auto retries = m_localTX->getTxSet();
1815 auto const lastVal = registry_.getLedgerMaster().getValidatedLedger();
1817 if (lastVal)
1818 rules = makeRulesGivenLedger(*lastVal, registry_.app().config().features);
1819 else
1820 rules.emplace(registry_.app().config().features);
1821 registry_.openLedger().accept(
1822 registry_.app(),
1823 *rules,
1824 newLCL,
1825 OrderedTxs({}),
1826 false,
1827 retries,
1828 tapNONE,
1829 "jump",
1830 [&](OpenView& view, beast::Journal j) {
1831 // Stuff the ledger with transactions from the queue.
1832 return registry_.getTxQ().accept(registry_.app(), view);
1833 });
1834 }
1835
1836 m_ledgerMaster.switchLCL(newLCL);
1837
1838 protocol::TMStatusChange s;
1839 s.set_newevent(protocol::neSWITCHED_LEDGER);
1840 s.set_ledgerseq(newLCL->header().seq);
1841 s.set_networktime(registry_.timeKeeper().now().time_since_epoch().count());
1842 s.set_ledgerhashprevious(newLCL->header().parentHash.begin(), newLCL->header().parentHash.size());
1843 s.set_ledgerhash(newLCL->header().hash.begin(), newLCL->header().hash.size());
1844
1845 registry_.overlay().foreach(send_always(std::make_shared<Message>(s, protocol::mtSTATUS_CHANGE)));
1846}
1847
1848bool
1849NetworkOPsImp::beginConsensus(uint256 const& networkClosed, std::unique_ptr<std::stringstream> const& clog)
1850{
1851 XRPL_ASSERT(networkClosed.isNonZero(), "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
1852
1853 auto closingInfo = m_ledgerMaster.getCurrentLedger()->header();
1854
1855 JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq << " with LCL " << closingInfo.parentHash;
1856
1857 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
1858
1859 if (!prevLedger)
1860 {
1861 // this shouldn't happen unless we jump ledgers
1862 if (mMode == OperatingMode::FULL)
1863 {
1864 JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
1865 setMode(OperatingMode::TRACKING);
1866 CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
1867 }
1868
1869 CLOG(clog) << "beginConsensus no previous ledger. ";
1870 return false;
1871 }
1872
1873 XRPL_ASSERT(
1874 prevLedger->header().hash == closingInfo.parentHash,
1875 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
1876 "parent");
1877 XRPL_ASSERT(
1878 closingInfo.parentHash == m_ledgerMaster.getClosedLedger()->header().hash,
1879 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
1880 "hash");
1881
1882 registry_.validators().setNegativeUNL(prevLedger->negativeUNL());
1883 TrustChanges const changes = registry_.validators().updateTrusted(
1884 registry_.getValidations().getCurrentNodeIDs(),
1885 closingInfo.parentCloseTime,
1886 *this,
1887 registry_.overlay(),
1888 registry_.getHashRouter());
1889
1890 if (!changes.added.empty() || !changes.removed.empty())
1891 {
1892 registry_.getValidations().trustChanged(changes.added, changes.removed);
1893 // Update the AmendmentTable so it tracks the current validators.
1894 registry_.getAmendmentTable().trustChanged(registry_.validators().getQuorumKeys().second);
1895 }
1896
1897 mConsensus.startRound(
1898 registry_.timeKeeper().closeTime(), networkClosed, prevLedger, changes.removed, changes.added, clog);
1899
1900 ConsensusPhase const currPhase = mConsensus.phase();
1901 if (mLastConsensusPhase != currPhase)
1902 {
1903 reportConsensusStateChange(currPhase);
1904 mLastConsensusPhase = currPhase;
1905 }
1906
1907 JLOG(m_journal.debug()) << "Initiating consensus engine";
1908 return true;
1909}
1910
1911bool
1912NetworkOPsImp::processTrustedProposal(RCLCxPeerPos peerPos)
1913{
1914 auto const& peerKey = peerPos.publicKey();
1915 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
1916 {
1917 // Could indicate a operator misconfiguration where two nodes are
1918 // running with the same validator key configured, so this isn't fatal,
1919 // and it doesn't necessarily indicate peer misbehavior. But since this
1920 // is a trusted message, it could be a very big deal. Either way, we
1921 // don't want to relay the proposal. Note that the byzantine behavior
1922 // detection in handleNewValidation will notify other peers.
1923 //
1924 // Another, innocuous explanation is unusual message routing and delays,
1925 // causing this node to receive its own messages back.
1926 JLOG(m_journal.error()) << "Received a proposal signed by MY KEY from a peer. This may "
1927 "indicate a misconfiguration where another node has the same "
1928 "validator key, or may be caused by unusual message routing and "
1929 "delays.";
1930 return false;
1931 }
1932
1933 return mConsensus.peerProposal(registry_.timeKeeper().closeTime(), peerPos);
1934}
1935
1936void
1937NetworkOPsImp::mapComplete(std::shared_ptr<SHAMap> const& map, bool fromAcquire)
1938{
1939 // We now have an additional transaction set
1940 // either created locally during the consensus process
1941 // or acquired from a peer
1942
1943 // Inform peers we have this set
1944 protocol::TMHaveTransactionSet msg;
1945 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
1946 msg.set_status(protocol::tsHAVE);
1947 registry_.overlay().foreach(send_always(std::make_shared<Message>(msg, protocol::mtHAVE_SET)));
1948
1949 // We acquired it because consensus asked us to
1950 if (fromAcquire)
1951 mConsensus.gotTxSet(registry_.timeKeeper().closeTime(), RCLTxSet{map});
1952}
1953
1954void
1955NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
1956{
1957 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->header().parentHash;
1958
1959 for (auto const& it : registry_.overlay().getActivePeers())
1960 {
1961 if (it && (it->getClosedLedgerHash() == deadLedger))
1962 {
1963 JLOG(m_journal.trace()) << "Killing obsolete peer status";
1964 it->cycleStatus();
1965 }
1966 }
1967
1968 uint256 networkClosed;
1969 bool ledgerChange = checkLastClosedLedger(registry_.overlay().getActivePeers(), networkClosed);
1970
1971 if (networkClosed.isZero())
1972 {
1973 CLOG(clog) << "endConsensus last closed ledger is zero. ";
1974 return;
1975 }
1976
1977 // WRITEME: Unless we are in FULL and in the process of doing a consensus,
1978 // we must count how many nodes share our LCL, how many nodes disagree with
1979 // our LCL, and how many validations our LCL has. We also want to check
1980 // timing to make sure there shouldn't be a newer LCL. We need this
1981 // information to do the next three tests.
1982
1983 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::SYNCING)) && !ledgerChange)
1984 {
1985 // Count number of peers that agree with us and UNL nodes whose
1986 // validations we have for LCL. If the ledger is good enough, go to
1987 // TRACKING - TODO
1988 if (!needNetworkLedger_)
1989 setMode(OperatingMode::TRACKING);
1990 }
1991
1992 if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) && !ledgerChange)
1993 {
1994 // check if the ledger is good enough to go to FULL
1995 // Note: Do not go to FULL if we don't have the previous ledger
1996 // check if the ledger is bad enough to go to CONNECTED -- TODO
1997 auto current = m_ledgerMaster.getCurrentLedger();
1998 if (registry_.timeKeeper().now() <
1999 (current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
2000 {
2001 setMode(OperatingMode::FULL);
2002 }
2003 }
2004
2005 beginConsensus(networkClosed, clog);
2006}
2007
2008void
2009NetworkOPsImp::consensusViewChange()
2010{
2011 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2012 {
2013 setMode(OperatingMode::CONNECTED);
2014 }
2015}
2016
2017void
2018NetworkOPsImp::pubManifest(Manifest const& mo)
2019{
2020 // VFALCO consider std::shared_mutex
2021 std::lock_guard sl(mSubLock);
2022
2023 if (!mStreamMaps[sManifests].empty())
2024 {
2026
2027 jvObj[jss::type] = "manifestReceived";
2028 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, mo.masterKey);
2029 if (mo.signingKey)
2030 jvObj[jss::signing_key] = toBase58(TokenType::NodePublic, *mo.signingKey);
2031 jvObj[jss::seq] = Json::UInt(mo.sequence);
2032 if (auto sig = mo.getSignature())
2033 jvObj[jss::signature] = strHex(*sig);
2034 jvObj[jss::master_signature] = strHex(mo.getMasterSignature());
2035 if (!mo.domain.empty())
2036 jvObj[jss::domain] = mo.domain;
2037 jvObj[jss::manifest] = strHex(mo.serialized);
2038
2039 for (auto i = mStreamMaps[sManifests].begin(); i != mStreamMaps[sManifests].end();)
2040 {
2041 if (auto p = i->second.lock())
2042 {
2043 p->send(jvObj, true);
2044 ++i;
2045 }
2046 else
2047 {
2048 i = mStreamMaps[sManifests].erase(i);
2049 }
2050 }
2051 }
2052}
2053
2054NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2055 XRPAmount fee,
2056 TxQ::Metrics&& escalationMetrics,
2057 LoadFeeTrack const& loadFeeTrack)
2058 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2059 , loadBaseServer{loadFeeTrack.getLoadBase()}
2060 , baseFee{fee}
2061 , em{std::move(escalationMetrics)}
2062{
2063}
2064
2065bool
2067{
2068 if (loadFactorServer != b.loadFactorServer || loadBaseServer != b.loadBaseServer || baseFee != b.baseFee ||
2069 em.has_value() != b.em.has_value())
2070 return true;
2071
2072 if (em && b.em)
2073 {
2074 return (
2075 em->minProcessingFeeLevel != b.em->minProcessingFeeLevel ||
2076 em->openLedgerFeeLevel != b.em->openLedgerFeeLevel || em->referenceFeeLevel != b.em->referenceFeeLevel);
2077 }
2078
2079 return false;
2080}
2081
2082// Need to cap to uint64 to uint32 due to JSON limitations
2083static std::uint32_t
2085{
2087
2088 return std::min(max32, v);
2089};
2090
2091void
2093{
2094 // VFALCO TODO Don't hold the lock across calls to send...make a copy of the
2095 // list into a local array while holding the lock then release
2096 // the lock and call send on everyone.
2097 //
2099
2100 if (!mStreamMaps[sServer].empty())
2101 {
2103
2105 registry_.openLedger().current()->fees().base,
2108
2109 jvObj[jss::type] = "serverStatus";
2110 jvObj[jss::server_status] = strOperatingMode();
2111 jvObj[jss::load_base] = f.loadBaseServer;
2112 jvObj[jss::load_factor_server] = f.loadFactorServer;
2113 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2114
2115 if (f.em)
2116 {
2117 auto const loadFactor = std::max(
2118 safe_cast<std::uint64_t>(f.loadFactorServer),
2119 mulDiv(f.em->openLedgerFeeLevel, f.loadBaseServer, f.em->referenceFeeLevel).value_or(xrpl::muldiv_max));
2120
2121 jvObj[jss::load_factor] = trunc32(loadFactor);
2122 jvObj[jss::load_factor_fee_escalation] = f.em->openLedgerFeeLevel.jsonClipped();
2123 jvObj[jss::load_factor_fee_queue] = f.em->minProcessingFeeLevel.jsonClipped();
2124 jvObj[jss::load_factor_fee_reference] = f.em->referenceFeeLevel.jsonClipped();
2125 }
2126 else
2127 jvObj[jss::load_factor] = f.loadFactorServer;
2128
2129 mLastFeeSummary = f;
2130
2131 for (auto i = mStreamMaps[sServer].begin(); i != mStreamMaps[sServer].end();)
2132 {
2133 InfoSub::pointer p = i->second.lock();
2134
2135 // VFALCO TODO research the possibility of using thread queues and
2136 // linearizing the deletion of subscribers with the
2137 // sending of JSON data.
2138 if (p)
2139 {
2140 p->send(jvObj, true);
2141 ++i;
2142 }
2143 else
2144 {
2145 i = mStreamMaps[sServer].erase(i);
2146 }
2147 }
2148 }
2149}
2150
2151void
2153{
2155
2156 auto& streamMap = mStreamMaps[sConsensusPhase];
2157 if (!streamMap.empty())
2158 {
2160 jvObj[jss::type] = "consensusPhase";
2161 jvObj[jss::consensus] = to_string(phase);
2162
2163 for (auto i = streamMap.begin(); i != streamMap.end();)
2164 {
2165 if (auto p = i->second.lock())
2166 {
2167 p->send(jvObj, true);
2168 ++i;
2169 }
2170 else
2171 {
2172 i = streamMap.erase(i);
2173 }
2174 }
2175 }
2176}
2177
2178void
2180{
2181 // VFALCO consider std::shared_mutex
2183
2184 if (!mStreamMaps[sValidations].empty())
2185 {
2187
2188 auto const signerPublic = val->getSignerPublic();
2189
2190 jvObj[jss::type] = "validationReceived";
2191 jvObj[jss::validation_public_key] = toBase58(TokenType::NodePublic, signerPublic);
2192 jvObj[jss::ledger_hash] = to_string(val->getLedgerHash());
2193 jvObj[jss::signature] = strHex(val->getSignature());
2194 jvObj[jss::full] = val->isFull();
2195 jvObj[jss::flags] = val->getFlags();
2196 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2197 jvObj[jss::data] = strHex(val->getSerializer().slice());
2198 jvObj[jss::network_id] = registry_.app().config().NETWORK_ID;
2199
2200 if (auto version = (*val)[~sfServerVersion])
2201 jvObj[jss::server_version] = std::to_string(*version);
2202
2203 if (auto cookie = (*val)[~sfCookie])
2204 jvObj[jss::cookie] = std::to_string(*cookie);
2205
2206 if (auto hash = (*val)[~sfValidatedHash])
2207 jvObj[jss::validated_hash] = strHex(*hash);
2208
2209 auto const masterKey = registry_.validatorManifests().getMasterKey(signerPublic);
2210
2211 if (masterKey != signerPublic)
2212 jvObj[jss::master_key] = toBase58(TokenType::NodePublic, masterKey);
2213
2214 // NOTE *seq is a number, but old API versions used string. We replace
2215 // number with a string using MultiApiJson near end of this function
2216 if (auto const seq = (*val)[~sfLedgerSequence])
2217 jvObj[jss::ledger_index] = *seq;
2218
2219 if (val->isFieldPresent(sfAmendments))
2220 {
2221 jvObj[jss::amendments] = Json::Value(Json::arrayValue);
2222 for (auto const& amendment : val->getFieldV256(sfAmendments))
2223 jvObj[jss::amendments].append(to_string(amendment));
2224 }
2225
2226 if (auto const closeTime = (*val)[~sfCloseTime])
2227 jvObj[jss::close_time] = *closeTime;
2228
2229 if (auto const loadFee = (*val)[~sfLoadFee])
2230 jvObj[jss::load_fee] = *loadFee;
2231
2232 if (auto const baseFee = val->at(~sfBaseFee))
2233 jvObj[jss::base_fee] = static_cast<double>(*baseFee);
2234
2235 if (auto const reserveBase = val->at(~sfReserveBase))
2236 jvObj[jss::reserve_base] = *reserveBase;
2237
2238 if (auto const reserveInc = val->at(~sfReserveIncrement))
2239 jvObj[jss::reserve_inc] = *reserveInc;
2240
2241 // (The ~ operator converts the Proxy to a std::optional, which
2242 // simplifies later operations)
2243 if (auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops); baseFeeXRP && baseFeeXRP->native())
2244 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2245
2246 if (auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops); reserveBaseXRP && reserveBaseXRP->native())
2247 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2248
2249 if (auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops); reserveIncXRP && reserveIncXRP->native())
2250 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2251
2252 // NOTE Use MultiApiJson to publish two slightly different JSON objects
2253 // for consumers supporting different API versions
2254 MultiApiJson multiObj{jvObj};
2255 multiObj.visit(
2256 RPC::apiVersion<1>, //
2257 [](Json::Value& jvTx) {
2258 // Type conversion for older API versions to string
2259 if (jvTx.isMember(jss::ledger_index))
2260 {
2261 jvTx[jss::ledger_index] = std::to_string(jvTx[jss::ledger_index].asUInt());
2262 }
2263 });
2264
2265 for (auto i = mStreamMaps[sValidations].begin(); i != mStreamMaps[sValidations].end();)
2266 {
2267 if (auto p = i->second.lock())
2268 {
2269 multiObj.visit(
2270 p->getApiVersion(), //
2271 [&](Json::Value const& jv) { p->send(jv, true); });
2272 ++i;
2273 }
2274 else
2275 {
2276 i = mStreamMaps[sValidations].erase(i);
2277 }
2278 }
2279 }
2280}
2281
2282void
2284{
2286
2287 if (!mStreamMaps[sPeerStatus].empty())
2288 {
2289 Json::Value jvObj(func());
2290
2291 jvObj[jss::type] = "peerStatusChange";
2292
2293 for (auto i = mStreamMaps[sPeerStatus].begin(); i != mStreamMaps[sPeerStatus].end();)
2294 {
2295 InfoSub::pointer p = i->second.lock();
2296
2297 if (p)
2298 {
2299 p->send(jvObj, true);
2300 ++i;
2301 }
2302 else
2303 {
2304 i = mStreamMaps[sPeerStatus].erase(i);
2305 }
2306 }
2307 }
2308}
2309
2310void
2312{
2313 using namespace std::chrono_literals;
2314 if (om == OperatingMode::CONNECTED)
2315 {
2318 }
2319 else if (om == OperatingMode::SYNCING)
2320 {
2323 }
2324
2325 if ((om > OperatingMode::CONNECTED) && isBlocked())
2327
2328 if (mMode == om)
2329 return;
2330
2331 mMode = om;
2332
2333 accounting_.mode(om);
2334
2335 JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
2336 pubServer();
2337}
2338
2339bool
2341{
2342 JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
2343
2345 BypassAccept bypassAccept = BypassAccept::no;
2346 try
2347 {
2348 if (pendingValidations_.contains(val->getLedgerHash()))
2349 bypassAccept = BypassAccept::yes;
2350 else
2351 pendingValidations_.insert(val->getLedgerHash());
2352 scope_unlock unlock(lock);
2353 handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
2354 }
2355 catch (std::exception const& e)
2356 {
2357 JLOG(m_journal.warn()) << "Exception thrown for handling new validation " << val->getLedgerHash() << ": "
2358 << e.what();
2359 }
2360 catch (...)
2361 {
2362 JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation " << val->getLedgerHash();
2363 }
2364 if (bypassAccept == BypassAccept::no)
2365 {
2366 pendingValidations_.erase(val->getLedgerHash());
2367 }
2368 lock.unlock();
2369
2370 pubValidation(val);
2371
2372 JLOG(m_journal.debug()) << [this, &val]() -> auto {
2374 ss << "VALIDATION: " << val->render() << " master_key: ";
2375 auto master = registry_.validators().getTrustedKey(val->getSignerPublic());
2376 if (master)
2377 {
2378 ss << toBase58(TokenType::NodePublic, *master);
2379 }
2380 else
2381 {
2382 ss << "none";
2383 }
2384 return ss.str();
2385 }();
2386
2387 // We will always relay trusted validations; if configured, we will
2388 // also relay all untrusted validations.
2389 return registry_.app().config().RELAY_UNTRUSTED_VALIDATIONS == 1 || val->isTrusted();
2390}
2391
2394{
2395 return mConsensus.getJson(true);
2396}
2397
2399NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
2400{
2402
2403 // System-level warnings
2404 {
2405 Json::Value warnings{Json::arrayValue};
2406 if (isAmendmentBlocked())
2407 {
2408 Json::Value& w = warnings.append(Json::objectValue);
2409 w[jss::id] = warnRPC_AMENDMENT_BLOCKED;
2410 w[jss::message] =
2411 "This server is amendment blocked, and must be updated to be "
2412 "able to stay in sync with the network.";
2413 }
2414 if (isUNLBlocked())
2415 {
2416 Json::Value& w = warnings.append(Json::objectValue);
2417 w[jss::id] = warnRPC_EXPIRED_VALIDATOR_LIST;
2418 w[jss::message] =
2419 "This server has an expired validator list. validators.txt "
2420 "may be incorrectly configured or some [validator_list_sites] "
2421 "may be unreachable.";
2422 }
2423 if (admin && isAmendmentWarned())
2424 {
2425 Json::Value& w = warnings.append(Json::objectValue);
2426 w[jss::id] = warnRPC_UNSUPPORTED_MAJORITY;
2427 w[jss::message] =
2428 "One or more unsupported amendments have reached majority. "
2429 "Upgrade to the latest version before they are activated "
2430 "to avoid being amendment blocked.";
2431 if (auto const expected = registry_.getAmendmentTable().firstUnsupportedExpected())
2432 {
2433 auto& d = w[jss::details] = Json::objectValue;
2434 d[jss::expected_date] = expected->time_since_epoch().count();
2435 d[jss::expected_date_UTC] = to_string(*expected);
2436 }
2437 }
2438
2439 if (warnings.size())
2440 info[jss::warnings] = std::move(warnings);
2441 }
2442
2443 // hostid: unique string describing the machine
2444 if (human)
2445 info[jss::hostid] = getHostId(admin);
2446
2447 // domain: if configured with a domain, report it:
2449 info[jss::server_domain] = registry_.app().config().SERVER_DOMAIN;
2450
2451 info[jss::build_version] = BuildInfo::getVersionString();
2452
2453 info[jss::server_state] = strOperatingMode(admin);
2454
2455 info[jss::time] = to_string(std::chrono::floor<std::chrono::microseconds>(std::chrono::system_clock::now()));
2456
2458 info[jss::network_ledger] = "waiting";
2459
2460 info[jss::validation_quorum] = static_cast<Json::UInt>(registry_.validators().quorum());
2461
2462 if (admin)
2463 {
2464 switch (registry_.app().config().NODE_SIZE)
2465 {
2466 case 0:
2467 info[jss::node_size] = "tiny";
2468 break;
2469 case 1:
2470 info[jss::node_size] = "small";
2471 break;
2472 case 2:
2473 info[jss::node_size] = "medium";
2474 break;
2475 case 3:
2476 info[jss::node_size] = "large";
2477 break;
2478 case 4:
2479 info[jss::node_size] = "huge";
2480 break;
2481 }
2482
2483 auto when = registry_.validators().expires();
2484
2485 if (!human)
2486 {
2487 if (when)
2488 info[jss::validator_list_expires] = safe_cast<Json::UInt>(when->time_since_epoch().count());
2489 else
2490 info[jss::validator_list_expires] = 0;
2491 }
2492 else
2493 {
2494 auto& x = (info[jss::validator_list] = Json::objectValue);
2495
2496 x[jss::count] = static_cast<Json::UInt>(registry_.validators().count());
2497
2498 if (when)
2499 {
2500 if (*when == TimeKeeper::time_point::max())
2501 {
2502 x[jss::expiration] = "never";
2503 x[jss::status] = "active";
2504 }
2505 else
2506 {
2507 x[jss::expiration] = to_string(*when);
2508
2509 if (*when > registry_.timeKeeper().now())
2510 x[jss::status] = "active";
2511 else
2512 x[jss::status] = "expired";
2513 }
2514 }
2515 else
2516 {
2517 x[jss::status] = "unknown";
2518 x[jss::expiration] = "unknown";
2519 }
2520 }
2521
2522#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2523 {
2524 auto& x = (info[jss::git] = Json::objectValue);
2525#ifdef GIT_COMMIT_HASH
2526 x[jss::hash] = GIT_COMMIT_HASH;
2527#endif
2528#ifdef GIT_BRANCH
2529 x[jss::branch] = GIT_BRANCH;
2530#endif
2531 }
2532#endif
2533 }
2534 info[jss::io_latency_ms] = static_cast<Json::UInt>(registry_.app().getIOLatency().count());
2535
2536 if (admin)
2537 {
2538 if (auto const localPubKey = registry_.validators().localPublicKey();
2539 localPubKey && registry_.app().getValidationPublicKey())
2540 {
2541 info[jss::pubkey_validator] = toBase58(TokenType::NodePublic, localPubKey.value());
2542 }
2543 else
2544 {
2545 info[jss::pubkey_validator] = "none";
2546 }
2547 }
2548
2549 if (counters)
2550 {
2551 info[jss::counters] = registry_.getPerfLog().countersJson();
2552
2553 Json::Value nodestore(Json::objectValue);
2555 info[jss::counters][jss::nodestore] = nodestore;
2556 info[jss::current_activities] = registry_.getPerfLog().currentJson();
2557 }
2558
2559 info[jss::pubkey_node] = toBase58(TokenType::NodePublic, registry_.app().nodeIdentity().first);
2560
2561 info[jss::complete_ledgers] = registry_.getLedgerMaster().getCompleteLedgers();
2562
2564 info[jss::amendment_blocked] = true;
2565
2566 auto const fp = m_ledgerMaster.getFetchPackCacheSize();
2567
2568 if (fp != 0)
2569 info[jss::fetch_pack] = Json::UInt(fp);
2570
2571 info[jss::peers] = Json::UInt(registry_.overlay().size());
2572
2573 Json::Value lastClose = Json::objectValue;
2574 lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
2575
2576 if (human)
2577 {
2578 lastClose[jss::converge_time_s] = std::chrono::duration<double>{mConsensus.prevRoundTime()}.count();
2579 }
2580 else
2581 {
2582 lastClose[jss::converge_time] = Json::Int(mConsensus.prevRoundTime().count());
2583 }
2584
2585 info[jss::last_close] = lastClose;
2586
2587 // info[jss::consensus] = mConsensus.getJson();
2588
2589 if (admin)
2590 info[jss::load] = m_job_queue.getJson();
2591
2592 if (auto const netid = registry_.overlay().networkID())
2593 info[jss::network_id] = static_cast<Json::UInt>(*netid);
2594
2595 auto const escalationMetrics = registry_.getTxQ().getMetrics(*registry_.openLedger().current());
2596
2597 auto const loadFactorServer = registry_.getFeeTrack().getLoadFactor();
2598 auto const loadBaseServer = registry_.getFeeTrack().getLoadBase();
2599 /* Scale the escalated fee level to unitless "load factor".
2600 In practice, this just strips the units, but it will continue
2601 to work correctly if either base value ever changes. */
2602 auto const loadFactorFeeEscalation =
2603 mulDiv(escalationMetrics.openLedgerFeeLevel, loadBaseServer, escalationMetrics.referenceFeeLevel)
2605
2606 auto const loadFactor = std::max(safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2607
2608 if (!human)
2609 {
2610 info[jss::load_base] = loadBaseServer;
2611 info[jss::load_factor] = trunc32(loadFactor);
2612 info[jss::load_factor_server] = loadFactorServer;
2613
2614 /* Json::Value doesn't support uint64, so clamp to max
2615 uint32 value. This is mostly theoretical, since there
2616 probably isn't enough extant XRP to drive the factor
2617 that high.
2618 */
2619 info[jss::load_factor_fee_escalation] = escalationMetrics.openLedgerFeeLevel.jsonClipped();
2620 info[jss::load_factor_fee_queue] = escalationMetrics.minProcessingFeeLevel.jsonClipped();
2621 info[jss::load_factor_fee_reference] = escalationMetrics.referenceFeeLevel.jsonClipped();
2622 }
2623 else
2624 {
2625 info[jss::load_factor] = static_cast<double>(loadFactor) / loadBaseServer;
2626
2627 if (loadFactorServer != loadFactor)
2628 info[jss::load_factor_server] = static_cast<double>(loadFactorServer) / loadBaseServer;
2629
2630 if (admin)
2631 {
2633 if (fee != loadBaseServer)
2634 info[jss::load_factor_local] = static_cast<double>(fee) / loadBaseServer;
2636 if (fee != loadBaseServer)
2637 info[jss::load_factor_net] = static_cast<double>(fee) / loadBaseServer;
2639 if (fee != loadBaseServer)
2640 info[jss::load_factor_cluster] = static_cast<double>(fee) / loadBaseServer;
2641 }
2642 if (escalationMetrics.openLedgerFeeLevel != escalationMetrics.referenceFeeLevel &&
2643 (admin || loadFactorFeeEscalation != loadFactor))
2644 info[jss::load_factor_fee_escalation] =
2645 escalationMetrics.openLedgerFeeLevel.decimalFromReference(escalationMetrics.referenceFeeLevel);
2646 if (escalationMetrics.minProcessingFeeLevel != escalationMetrics.referenceFeeLevel)
2647 info[jss::load_factor_fee_queue] =
2648 escalationMetrics.minProcessingFeeLevel.decimalFromReference(escalationMetrics.referenceFeeLevel);
2649 }
2650
2651 bool valid = false;
2652 auto lpClosed = m_ledgerMaster.getValidatedLedger();
2653
2654 if (lpClosed)
2655 valid = true;
2656 else
2657 lpClosed = m_ledgerMaster.getClosedLedger();
2658
2659 if (lpClosed)
2660 {
2661 XRPAmount const baseFee = lpClosed->fees().base;
2663 l[jss::seq] = Json::UInt(lpClosed->header().seq);
2664 l[jss::hash] = to_string(lpClosed->header().hash);
2665
2666 if (!human)
2667 {
2668 l[jss::base_fee] = baseFee.jsonClipped();
2669 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2670 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2671 l[jss::close_time] = Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
2672 }
2673 else
2674 {
2675 l[jss::base_fee_xrp] = baseFee.decimalXRP();
2676 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2677 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2678
2679 if (auto const closeOffset = registry_.timeKeeper().closeOffset(); std::abs(closeOffset.count()) >= 60)
2680 l[jss::close_time_offset] = static_cast<std::uint32_t>(closeOffset.count());
2681
2682 constexpr std::chrono::seconds highAgeThreshold{1000000};
2684 {
2685 auto const age = m_ledgerMaster.getValidatedLedgerAge();
2686 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2687 }
2688 else
2689 {
2690 auto lCloseTime = lpClosed->header().closeTime;
2691 auto closeTime = registry_.timeKeeper().closeTime();
2692 if (lCloseTime <= closeTime)
2693 {
2694 using namespace std::chrono_literals;
2695 auto age = closeTime - lCloseTime;
2696 l[jss::age] = Json::UInt(age < highAgeThreshold ? age.count() : 0);
2697 }
2698 }
2699 }
2700
2701 if (valid)
2702 info[jss::validated_ledger] = l;
2703 else
2704 info[jss::closed_ledger] = l;
2705
2706 auto lpPublished = m_ledgerMaster.getPublishedLedger();
2707 if (!lpPublished)
2708 info[jss::published_ledger] = "none";
2709 else if (lpPublished->header().seq != lpClosed->header().seq)
2710 info[jss::published_ledger] = lpPublished->header().seq;
2711 }
2712
2713 accounting_.json(info);
2714 info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
2715 info[jss::jq_trans_overflow] = std::to_string(registry_.overlay().getJqTransOverflow());
2716 info[jss::peer_disconnects] = std::to_string(registry_.overlay().getPeerDisconnect());
2717 info[jss::peer_disconnects_resources] = std::to_string(registry_.overlay().getPeerDisconnectCharges());
2718
2719 // This array must be sorted in increasing order.
2720 static constexpr std::array<std::string_view, 7> protocols{"http", "https", "peer", "ws", "ws2", "wss", "wss2"};
2721 static_assert(std::is_sorted(std::begin(protocols), std::end(protocols)));
2722 {
2724 for (auto const& port : registry_.getServerHandler().setup().ports)
2725 {
2726 // Don't publish admin ports for non-admin users
2727 if (!admin &&
2728 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() && port.admin_user.empty() &&
2729 port.admin_password.empty()))
2730 continue;
2733 std::begin(port.protocol),
2734 std::end(port.protocol),
2735 std::begin(protocols),
2736 std::end(protocols),
2737 std::back_inserter(proto));
2738 if (!proto.empty())
2739 {
2740 auto& jv = ports.append(Json::Value(Json::objectValue));
2741 jv[jss::port] = std::to_string(port.port);
2742 jv[jss::protocol] = Json::Value{Json::arrayValue};
2743 for (auto const& p : proto)
2744 jv[jss::protocol].append(p);
2745 }
2746 }
2747
2748 if (registry_.app().config().exists(SECTION_PORT_GRPC))
2749 {
2750 auto const& grpcSection = registry_.app().config().section(SECTION_PORT_GRPC);
2751 auto const optPort = grpcSection.get("port");
2752 if (optPort && grpcSection.get("ip"))
2753 {
2754 auto& jv = ports.append(Json::Value(Json::objectValue));
2755 jv[jss::port] = *optPort;
2756 jv[jss::protocol] = Json::Value{Json::arrayValue};
2757 jv[jss::protocol].append("grpc");
2758 }
2759 }
2760 info[jss::ports] = std::move(ports);
2761 }
2762
2763 return info;
2764}
2765
2766void
2771
2777
2778void
2780 std::shared_ptr<ReadView const> const& ledger,
2781 std::shared_ptr<STTx const> const& transaction,
2782 TER result)
2783{
2784 // never publish an inner txn inside a batch txn. The flag should
2785 // only be set if the Batch feature is enabled. If Batch is not
2786 // enabled, the flag is always invalid, so don't publish it
2787 // regardless.
2788 if (transaction->isFlag(tfInnerBatchTxn))
2789 return;
2790
2791 MultiApiJson jvObj = transJson(transaction, result, false, ledger, std::nullopt);
2792
2793 {
2795
2796 auto it = mStreamMaps[sRTTransactions].begin();
2797 while (it != mStreamMaps[sRTTransactions].end())
2798 {
2799 InfoSub::pointer p = it->second.lock();
2800
2801 if (p)
2802 {
2803 jvObj.visit(
2804 p->getApiVersion(), //
2805 [&](Json::Value const& jv) { p->send(jv, true); });
2806 ++it;
2807 }
2808 else
2809 {
2810 it = mStreamMaps[sRTTransactions].erase(it);
2811 }
2812 }
2813 }
2814
2815 pubProposedAccountTransaction(ledger, transaction, result);
2816}
2817
2818void
2820{
2821 // Ledgers are published only when they acquire sufficient validations
2822 // Holes are filled across connection loss or other catastrophe
2823
2824 std::shared_ptr<AcceptedLedger> alpAccepted = registry_.getAcceptedLedgerCache().fetch(lpAccepted->header().hash);
2825 if (!alpAccepted)
2826 {
2827 alpAccepted = std::make_shared<AcceptedLedger>(lpAccepted);
2828 registry_.getAcceptedLedgerCache().canonicalize_replace_client(lpAccepted->header().hash, alpAccepted);
2829 }
2830
2831 XRPL_ASSERT(alpAccepted->getLedger().get() == lpAccepted.get(), "xrpl::NetworkOPsImp::pubLedger : accepted input");
2832
2833 {
2834 JLOG(m_journal.debug()) << "Publishing ledger " << lpAccepted->header().seq << " " << lpAccepted->header().hash;
2835
2837
2838 if (!mStreamMaps[sLedger].empty())
2839 {
2841
2842 jvObj[jss::type] = "ledgerClosed";
2843 jvObj[jss::ledger_index] = lpAccepted->header().seq;
2844 jvObj[jss::ledger_hash] = to_string(lpAccepted->header().hash);
2845 jvObj[jss::ledger_time] = Json::Value::UInt(lpAccepted->header().closeTime.time_since_epoch().count());
2846
2847 jvObj[jss::network_id] = registry_.app().config().NETWORK_ID;
2848
2849 if (!lpAccepted->rules().enabled(featureXRPFees))
2850 jvObj[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
2851 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
2852 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
2853 jvObj[jss::reserve_inc] = lpAccepted->fees().increment.jsonClipped();
2854
2855 jvObj[jss::txn_count] = Json::UInt(alpAccepted->size());
2856
2858 {
2859 jvObj[jss::validated_ledgers] = registry_.getLedgerMaster().getCompleteLedgers();
2860 }
2861
2862 auto it = mStreamMaps[sLedger].begin();
2863 while (it != mStreamMaps[sLedger].end())
2864 {
2865 InfoSub::pointer p = it->second.lock();
2866 if (p)
2867 {
2868 p->send(jvObj, true);
2869 ++it;
2870 }
2871 else
2872 it = mStreamMaps[sLedger].erase(it);
2873 }
2874 }
2875
2876 if (!mStreamMaps[sBookChanges].empty())
2877 {
2878 Json::Value jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
2879
2880 auto it = mStreamMaps[sBookChanges].begin();
2881 while (it != mStreamMaps[sBookChanges].end())
2882 {
2883 InfoSub::pointer p = it->second.lock();
2884 if (p)
2885 {
2886 p->send(jvObj, true);
2887 ++it;
2888 }
2889 else
2890 it = mStreamMaps[sBookChanges].erase(it);
2891 }
2892 }
2893
2894 {
2895 static bool firstTime = true;
2896 if (firstTime)
2897 {
2898 // First validated ledger, start delayed SubAccountHistory
2899 firstTime = false;
2900 for (auto& outer : mSubAccountHistory)
2901 {
2902 for (auto& inner : outer.second)
2903 {
2904 auto& subInfo = inner.second;
2905 if (subInfo.index_->separationLedgerSeq_ == 0)
2906 {
2907 subAccountHistoryStart(alpAccepted->getLedger(), subInfo);
2908 }
2909 }
2910 }
2911 }
2912 }
2913 }
2914
2915 // Don't lock since pubAcceptedTransaction is locking.
2916 for (auto const& accTx : *alpAccepted)
2917 {
2918 JLOG(m_journal.trace()) << "pubAccepted: " << accTx->getJson();
2919 pubValidatedTransaction(lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
2920 }
2921}
2922
2923void
2925{
2927 registry_.openLedger().current()->fees().base,
2930
2931 // only schedule the job if something has changed
2932 if (f != mLastFeeSummary)
2933 {
2934 m_job_queue.addJob(jtCLIENT_FEE_CHANGE, "PubFee", [this]() { pubServer(); });
2935 }
2936}
2937
2938void
2940{
2941 m_job_queue.addJob(jtCLIENT_CONSENSUS, "PubCons", [this, phase]() { pubConsensus(phase); });
2942}
2943
2944inline void
2946{
2947 m_localTX->sweep(view);
2948}
2949inline std::size_t
2951{
2952 return m_localTX->size();
2953}
2954
2955// This routine should only be used to publish accepted or validated
2956// transactions.
2959 std::shared_ptr<STTx const> const& transaction,
2960 TER result,
2961 bool validated,
2962 std::shared_ptr<ReadView const> const& ledger,
2964{
2966 std::string sToken;
2967 std::string sHuman;
2968
2969 transResultInfo(result, sToken, sHuman);
2970
2971 jvObj[jss::type] = "transaction";
2972 // NOTE jvObj is not a finished object for either API version. After
2973 // it's populated, we need to finish it for a specific API version. This is
2974 // done in a loop, near the end of this function.
2975 jvObj[jss::transaction] = transaction->getJson(JsonOptions::disable_API_prior_V2, false);
2976
2977 if (meta)
2978 {
2979 jvObj[jss::meta] = meta->get().getJson(JsonOptions::none);
2980 RPC::insertDeliveredAmount(jvObj[jss::meta], *ledger, transaction, meta->get());
2981 RPC::insertNFTSyntheticInJson(jvObj, transaction, meta->get());
2982 RPC::insertMPTokenIssuanceID(jvObj[jss::meta], transaction, meta->get());
2983 }
2984
2985 // add CTID where the needed data for it exists
2986 if (auto const& lookup = ledger->txRead(transaction->getTransactionID());
2987 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
2988 {
2989 uint32_t const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
2990 uint32_t netID = registry_.app().config().NETWORK_ID;
2991 if (transaction->isFieldPresent(sfNetworkID))
2992 netID = transaction->getFieldU32(sfNetworkID);
2993
2994 if (std::optional<std::string> ctid = RPC::encodeCTID(ledger->header().seq, txnSeq, netID); ctid)
2995 jvObj[jss::ctid] = *ctid;
2996 }
2997 if (!ledger->open())
2998 jvObj[jss::ledger_hash] = to_string(ledger->header().hash);
2999
3000 if (validated)
3001 {
3002 jvObj[jss::ledger_index] = ledger->header().seq;
3003 jvObj[jss::transaction][jss::date] = ledger->header().closeTime.time_since_epoch().count();
3004 jvObj[jss::validated] = true;
3005 jvObj[jss::close_time_iso] = to_string_iso(ledger->header().closeTime);
3006
3007 // WRITEME: Put the account next seq here
3008 }
3009 else
3010 {
3011 jvObj[jss::validated] = false;
3012 jvObj[jss::ledger_current_index] = ledger->header().seq;
3013 }
3014
3015 jvObj[jss::status] = validated ? "closed" : "proposed";
3016 jvObj[jss::engine_result] = sToken;
3017 jvObj[jss::engine_result_code] = result;
3018 jvObj[jss::engine_result_message] = sHuman;
3019
3020 if (transaction->getTxnType() == ttOFFER_CREATE)
3021 {
3022 auto const account = transaction->getAccountID(sfAccount);
3023 auto const amount = transaction->getFieldAmount(sfTakerGets);
3024
3025 // If the offer create is not self funded then add the owner balance
3026 if (account != amount.issue().account)
3027 {
3028 auto const ownerFunds = accountFunds(*ledger, account, amount, fhIGNORE_FREEZE, registry_.journal("View"));
3029 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3030 }
3031 }
3032
3033 std::string const hash = to_string(transaction->getTransactionID());
3034 MultiApiJson multiObj{jvObj};
3036 multiObj.visit(), //
3037 [&]<unsigned Version>(Json::Value& jvTx, std::integral_constant<unsigned, Version>) {
3038 RPC::insertDeliverMax(jvTx[jss::transaction], transaction->getTxnType(), Version);
3039
3040 if constexpr (Version > 1)
3041 {
3042 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3043 jvTx[jss::hash] = hash;
3044 }
3045 else
3046 {
3047 jvTx[jss::transaction][jss::hash] = hash;
3048 }
3049 });
3050
3051 return multiObj;
3052}
3053
3054void
3056 std::shared_ptr<ReadView const> const& ledger,
3057 AcceptedLedgerTx const& transaction,
3058 bool last)
3059{
3060 auto const& stTxn = transaction.getTxn();
3061
3062 // Create two different Json objects, for different API versions
3063 auto const metaRef = std::ref(transaction.getMeta());
3064 auto const trResult = transaction.getResult();
3065 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3066
3067 {
3069
3070 auto it = mStreamMaps[sTransactions].begin();
3071 while (it != mStreamMaps[sTransactions].end())
3072 {
3073 InfoSub::pointer p = it->second.lock();
3074
3075 if (p)
3076 {
3077 jvObj.visit(
3078 p->getApiVersion(), //
3079 [&](Json::Value const& jv) { p->send(jv, true); });
3080 ++it;
3081 }
3082 else
3083 it = mStreamMaps[sTransactions].erase(it);
3084 }
3085
3086 it = mStreamMaps[sRTTransactions].begin();
3087
3088 while (it != mStreamMaps[sRTTransactions].end())
3089 {
3090 InfoSub::pointer p = it->second.lock();
3091
3092 if (p)
3093 {
3094 jvObj.visit(
3095 p->getApiVersion(), //
3096 [&](Json::Value const& jv) { p->send(jv, true); });
3097 ++it;
3098 }
3099 else
3100 it = mStreamMaps[sRTTransactions].erase(it);
3101 }
3102 }
3103
3104 if (transaction.getResult() == tesSUCCESS)
3105 registry_.getOrderBookDB().processTxn(ledger, transaction, jvObj);
3106
3107 pubAccountTransaction(ledger, transaction, last);
3108}
3109
3110void
3112 std::shared_ptr<ReadView const> const& ledger,
3113 AcceptedLedgerTx const& transaction,
3114 bool last)
3115{
3117 int iProposed = 0;
3118 int iAccepted = 0;
3119
3120 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3121 auto const currLedgerSeq = ledger->seq();
3122 {
3124
3126 {
3127 for (auto const& affectedAccount : transaction.getAffected())
3128 {
3129 if (auto simiIt = mSubRTAccount.find(affectedAccount); simiIt != mSubRTAccount.end())
3130 {
3131 auto it = simiIt->second.begin();
3132
3133 while (it != simiIt->second.end())
3134 {
3135 InfoSub::pointer p = it->second.lock();
3136
3137 if (p)
3138 {
3139 notify.insert(p);
3140 ++it;
3141 ++iProposed;
3142 }
3143 else
3144 it = simiIt->second.erase(it);
3145 }
3146 }
3147
3148 if (auto simiIt = mSubAccount.find(affectedAccount); simiIt != mSubAccount.end())
3149 {
3150 auto it = simiIt->second.begin();
3151 while (it != simiIt->second.end())
3152 {
3153 InfoSub::pointer p = it->second.lock();
3154
3155 if (p)
3156 {
3157 notify.insert(p);
3158 ++it;
3159 ++iAccepted;
3160 }
3161 else
3162 it = simiIt->second.erase(it);
3163 }
3164 }
3165
3166 if (auto historyIt = mSubAccountHistory.find(affectedAccount); historyIt != mSubAccountHistory.end())
3167 {
3168 auto& subs = historyIt->second;
3169 auto it = subs.begin();
3170 while (it != subs.end())
3171 {
3172 SubAccountHistoryInfoWeak const& info = it->second;
3173 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3174 {
3175 ++it;
3176 continue;
3177 }
3178
3179 if (auto isSptr = info.sinkWptr_.lock(); isSptr)
3180 {
3181 accountHistoryNotify.emplace_back(SubAccountHistoryInfo{isSptr, info.index_});
3182 ++it;
3183 }
3184 else
3185 {
3186 it = subs.erase(it);
3187 }
3188 }
3189 if (subs.empty())
3190 mSubAccountHistory.erase(historyIt);
3191 }
3192 }
3193 }
3194 }
3195
3196 JLOG(m_journal.trace()) << "pubAccountTransaction: "
3197 << "proposed=" << iProposed << ", accepted=" << iAccepted;
3198
3199 if (!notify.empty() || !accountHistoryNotify.empty())
3200 {
3201 auto const& stTxn = transaction.getTxn();
3202
3203 // Create two different Json objects, for different API versions
3204 auto const metaRef = std::ref(transaction.getMeta());
3205 auto const trResult = transaction.getResult();
3206 MultiApiJson jvObj = transJson(stTxn, trResult, true, ledger, metaRef);
3207
3208 for (InfoSub::ref isrListener : notify)
3209 {
3210 jvObj.visit(
3211 isrListener->getApiVersion(), //
3212 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3213 }
3214
3215 if (last)
3216 jvObj.set(jss::account_history_boundary, true);
3217
3218 XRPL_ASSERT(
3219 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3220 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3221 "account_history_tx_stream not set");
3222 for (auto& info : accountHistoryNotify)
3223 {
3224 auto& index = info.index_;
3225 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3226 jvObj.set(jss::account_history_tx_first, true);
3227
3228 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3229
3230 jvObj.visit(
3231 info.sink_->getApiVersion(), //
3232 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3233 }
3234 }
3235}
3236
3237void
3239 std::shared_ptr<ReadView const> const& ledger,
3241 TER result)
3242{
3244 int iProposed = 0;
3245
3246 std::vector<SubAccountHistoryInfo> accountHistoryNotify;
3247
3248 {
3250
3251 if (mSubRTAccount.empty())
3252 return;
3253
3255 {
3256 for (auto const& affectedAccount : tx->getMentionedAccounts())
3257 {
3258 if (auto simiIt = mSubRTAccount.find(affectedAccount); simiIt != mSubRTAccount.end())
3259 {
3260 auto it = simiIt->second.begin();
3261
3262 while (it != simiIt->second.end())
3263 {
3264 InfoSub::pointer p = it->second.lock();
3265
3266 if (p)
3267 {
3268 notify.insert(p);
3269 ++it;
3270 ++iProposed;
3271 }
3272 else
3273 it = simiIt->second.erase(it);
3274 }
3275 }
3276 }
3277 }
3278 }
3279
3280 JLOG(m_journal.trace()) << "pubProposedAccountTransaction: " << iProposed;
3281
3282 if (!notify.empty() || !accountHistoryNotify.empty())
3283 {
3284 // Create two different Json objects, for different API versions
3285 MultiApiJson jvObj = transJson(tx, result, false, ledger, std::nullopt);
3286
3287 for (InfoSub::ref isrListener : notify)
3288 jvObj.visit(
3289 isrListener->getApiVersion(), //
3290 [&](Json::Value const& jv) { isrListener->send(jv, true); });
3291
3292 XRPL_ASSERT(
3293 jvObj.isMember(jss::account_history_tx_stream) == MultiApiJson::none,
3294 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3295 "account_history_tx_stream not set");
3296 for (auto& info : accountHistoryNotify)
3297 {
3298 auto& index = info.index_;
3299 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3300 jvObj.set(jss::account_history_tx_first, true);
3301 jvObj.set(jss::account_history_tx_index, index->forwardTxIndex_++);
3302 jvObj.visit(
3303 info.sink_->getApiVersion(), //
3304 [&](Json::Value const& jv) { info.sink_->send(jv, true); });
3305 }
3306 }
3307}
3308
3309//
3310// Monitoring
3311//
3312
3313void
3314NetworkOPsImp::subAccount(InfoSub::ref isrListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
3315{
3316 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3317
3318 for (auto const& naAccountID : vnaAccountIDs)
3319 {
3320 JLOG(m_journal.trace()) << "subAccount: account: " << toBase58(naAccountID);
3321
3322 isrListener->insertSubAccountInfo(naAccountID, rt);
3323 }
3324
3326
3327 for (auto const& naAccountID : vnaAccountIDs)
3328 {
3329 auto simIterator = subMap.find(naAccountID);
3330 if (simIterator == subMap.end())
3331 {
3332 // Not found, note that account has a new single listener.
3333 SubMapType usisElement;
3334 usisElement[isrListener->getSeq()] = isrListener;
3335 // VFALCO NOTE This is making a needless copy of naAccountID
3336 subMap.insert(simIterator, make_pair(naAccountID, usisElement));
3337 }
3338 else
3339 {
3340 // Found, note that the account has another listener.
3341 simIterator->second[isrListener->getSeq()] = isrListener;
3342 }
3343 }
3344}
3345
3346void
3347NetworkOPsImp::unsubAccount(InfoSub::ref isrListener, hash_set<AccountID> const& vnaAccountIDs, bool rt)
3348{
3349 for (auto const& naAccountID : vnaAccountIDs)
3350 {
3351 // Remove from the InfoSub
3352 isrListener->deleteSubAccountInfo(naAccountID, rt);
3353 }
3354
3355 // Remove from the server
3356 unsubAccountInternal(isrListener->getSeq(), vnaAccountIDs, rt);
3357}
3358
3359void
3361{
3363
3364 SubInfoMapType& subMap = rt ? mSubRTAccount : mSubAccount;
3365
3366 for (auto const& naAccountID : vnaAccountIDs)
3367 {
3368 auto simIterator = subMap.find(naAccountID);
3369
3370 if (simIterator != subMap.end())
3371 {
3372 // Found
3373 simIterator->second.erase(uSeq);
3374
3375 if (simIterator->second.empty())
3376 {
3377 // Don't need hash entry.
3378 subMap.erase(simIterator);
3379 }
3380 }
3381 }
3382}
3383
3384void
3386{
3387 enum DatabaseType { Sqlite, None };
3388 static auto const databaseType = [&]() -> DatabaseType {
3389 // Use a dynamic_cast to return DatabaseType::None
3390 // on failure.
3391 if (dynamic_cast<SQLiteDatabase*>(&registry_.getRelationalDatabase()))
3392 {
3393 return DatabaseType::Sqlite;
3394 }
3395 return DatabaseType::None;
3396 }();
3397
3398 if (databaseType == DatabaseType::None)
3399 {
3400 // LCOV_EXCL_START
3401 UNREACHABLE("xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
3402 JLOG(m_journal.error()) << "AccountHistory job for account " << toBase58(subInfo.index_->accountId_)
3403 << " no database";
3404 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3405 {
3406 sptr->send(rpcError(rpcINTERNAL), true);
3407 unsubAccountHistory(sptr, subInfo.index_->accountId_, false);
3408 }
3409 return;
3410 // LCOV_EXCL_STOP
3411 }
3412
3413 registry_.getJobQueue().addJob(jtCLIENT_ACCT_HIST, "HistTxStream", [this, dbType = databaseType, subInfo]() {
3414 auto const& accountId = subInfo.index_->accountId_;
3415 auto& lastLedgerSeq = subInfo.index_->historyLastLedgerSeq_;
3416 auto& txHistoryIndex = subInfo.index_->historyTxIndex_;
3417
3418 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3419 << " started. lastLedgerSeq=" << lastLedgerSeq;
3420
3421 auto isFirstTx = [&](std::shared_ptr<Transaction> const& tx, std::shared_ptr<TxMeta> const& meta) -> bool {
3422 /*
3423 * genesis account: first tx is the one with seq 1
3424 * other account: first tx is the one created the account
3425 */
3426 if (accountId == genesisAccountId)
3427 {
3428 auto stx = tx->getSTransaction();
3429 if (stx->getAccountID(sfAccount) == accountId && stx->getSeqValue() == 1)
3430 return true;
3431 }
3432
3433 for (auto& node : meta->getNodes())
3434 {
3435 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3436 continue;
3437
3438 if (node.isFieldPresent(sfNewFields))
3439 {
3440 if (auto inner = dynamic_cast<STObject const*>(node.peekAtPField(sfNewFields)); inner)
3441 {
3442 if (inner->isFieldPresent(sfAccount) && inner->getAccountID(sfAccount) == accountId)
3443 {
3444 return true;
3445 }
3446 }
3447 }
3448 }
3449
3450 return false;
3451 };
3452
3453 auto send = [&](Json::Value const& jvObj, bool unsubscribe) -> bool {
3454 if (auto sptr = subInfo.sinkWptr_.lock())
3455 {
3456 sptr->send(jvObj, true);
3457 if (unsubscribe)
3458 unsubAccountHistory(sptr, accountId, false);
3459 return true;
3460 }
3461
3462 return false;
3463 };
3464
3465 auto sendMultiApiJson = [&](MultiApiJson const& jvObj, bool unsubscribe) -> bool {
3466 if (auto sptr = subInfo.sinkWptr_.lock())
3467 {
3468 jvObj.visit(
3469 sptr->getApiVersion(), //
3470 [&](Json::Value const& jv) { sptr->send(jv, true); });
3471
3472 if (unsubscribe)
3473 unsubAccountHistory(sptr, accountId, false);
3474 return true;
3475 }
3476
3477 return false;
3478 };
3479
3480 auto getMoreTxns = [&](std::uint32_t minLedger,
3481 std::uint32_t maxLedger,
3483 -> std::optional<
3485 switch (dbType)
3486 {
3487 case Sqlite: {
3488 auto db = static_cast<SQLiteDatabase*>(&registry_.getRelationalDatabase());
3489 RelationalDatabase::AccountTxPageOptions options{accountId, minLedger, maxLedger, marker, 0, true};
3490 return db->newestAccountTxPage(options);
3491 }
3492 // LCOV_EXCL_START
3493 default: {
3494 UNREACHABLE(
3495 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3496 "getMoreTxns : invalid database type");
3497 return {};
3498 }
3499 // LCOV_EXCL_STOP
3500 }
3501 };
3502
3503 /*
3504 * search backward until the genesis ledger or asked to stop
3505 */
3506 while (lastLedgerSeq >= 2 && !subInfo.index_->stopHistorical_)
3507 {
3508 int feeChargeCount = 0;
3509 if (auto sptr = subInfo.sinkWptr_.lock(); sptr)
3510 {
3511 sptr->getConsumer().charge(Resource::feeMediumBurdenRPC);
3512 ++feeChargeCount;
3513 }
3514 else
3515 {
3516 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3517 << " no InfoSub. Fee charged " << feeChargeCount << " times.";
3518 return;
3519 }
3520
3521 // try to search in 1024 ledgers till reaching genesis ledgers
3522 auto startLedgerSeq = (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3523 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3524 << ", working on ledger range [" << startLedgerSeq << "," << lastLedgerSeq << "]";
3525
3526 auto haveRange = [&]() -> bool {
3527 std::uint32_t validatedMin = UINT_MAX;
3528 std::uint32_t validatedMax = 0;
3529 auto haveSomeValidatedLedgers =
3530 registry_.getLedgerMaster().getValidatedRange(validatedMin, validatedMax);
3531
3532 return haveSomeValidatedLedgers && validatedMin <= startLedgerSeq && lastLedgerSeq <= validatedMax;
3533 }();
3534
3535 if (!haveRange)
3536 {
3537 JLOG(m_journal.debug()) << "AccountHistory reschedule job for account " << toBase58(accountId)
3538 << ", incomplete ledger range [" << startLedgerSeq << "," << lastLedgerSeq
3539 << "]";
3541 return;
3542 }
3543
3545 while (!subInfo.index_->stopHistorical_)
3546 {
3547 auto dbResult = getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3548 if (!dbResult)
3549 {
3550 // LCOV_EXCL_START
3551 UNREACHABLE(
3552 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3553 "getMoreTxns failed");
3554 JLOG(m_journal.debug())
3555 << "AccountHistory job for account " << toBase58(accountId) << " getMoreTxns failed.";
3556 send(rpcError(rpcINTERNAL), true);
3557 return;
3558 // LCOV_EXCL_STOP
3559 }
3560
3561 auto const& txns = dbResult->first;
3562 marker = dbResult->second;
3563 size_t num_txns = txns.size();
3564 for (size_t i = 0; i < num_txns; ++i)
3565 {
3566 auto const& [tx, meta] = txns[i];
3567
3568 if (!tx || !meta)
3569 {
3570 JLOG(m_journal.debug())
3571 << "AccountHistory job for account " << toBase58(accountId) << " empty tx or meta.";
3572 send(rpcError(rpcINTERNAL), true);
3573 return;
3574 }
3575 auto curTxLedger = registry_.getLedgerMaster().getLedgerBySeq(tx->getLedger());
3576 if (!curTxLedger)
3577 {
3578 // LCOV_EXCL_START
3579 UNREACHABLE(
3580 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3581 "getLedgerBySeq failed");
3582 JLOG(m_journal.debug())
3583 << "AccountHistory job for account " << toBase58(accountId) << " no ledger.";
3584 send(rpcError(rpcINTERNAL), true);
3585 return;
3586 // LCOV_EXCL_STOP
3587 }
3588 std::shared_ptr<STTx const> stTxn = tx->getSTransaction();
3589 if (!stTxn)
3590 {
3591 // LCOV_EXCL_START
3592 UNREACHABLE(
3593 "NetworkOPsImp::addAccountHistoryJob : "
3594 "getSTransaction failed");
3595 JLOG(m_journal.debug())
3596 << "AccountHistory job for account " << toBase58(accountId) << " getSTransaction failed.";
3597 send(rpcError(rpcINTERNAL), true);
3598 return;
3599 // LCOV_EXCL_STOP
3600 }
3601
3602 auto const mRef = std::ref(*meta);
3603 auto const trR = meta->getResultTER();
3604 MultiApiJson jvTx = transJson(stTxn, trR, true, curTxLedger, mRef);
3605
3606 jvTx.set(jss::account_history_tx_index, txHistoryIndex--);
3607 if (i + 1 == num_txns || txns[i + 1].first->getLedger() != tx->getLedger())
3608 jvTx.set(jss::account_history_boundary, true);
3609
3610 if (isFirstTx(tx, meta))
3611 {
3612 jvTx.set(jss::account_history_tx_first, true);
3613 sendMultiApiJson(jvTx, false);
3614
3615 JLOG(m_journal.trace())
3616 << "AccountHistory job for account " << toBase58(accountId) << " done, found last tx.";
3617 return;
3618 }
3619 else
3620 {
3621 sendMultiApiJson(jvTx, false);
3622 }
3623 }
3624
3625 if (marker)
3626 {
3627 JLOG(m_journal.trace()) << "AccountHistory job for account " << toBase58(accountId)
3628 << " paging, marker=" << marker->ledgerSeq << ":" << marker->txnSeq;
3629 }
3630 else
3631 {
3632 break;
3633 }
3634 }
3635
3636 if (!subInfo.index_->stopHistorical_)
3637 {
3638 lastLedgerSeq = startLedgerSeq - 1;
3639 if (lastLedgerSeq <= 1)
3640 {
3641 JLOG(m_journal.trace())
3642 << "AccountHistory job for account " << toBase58(accountId) << " done, reached genesis ledger.";
3643 return;
3644 }
3645 }
3646 }
3647 });
3648}
3649
3650void
3652{
3653 subInfo.index_->separationLedgerSeq_ = ledger->seq();
3654 auto const& accountId = subInfo.index_->accountId_;
3655 auto const accountKeylet = keylet::account(accountId);
3656 if (!ledger->exists(accountKeylet))
3657 {
3658 JLOG(m_journal.debug()) << "subAccountHistoryStart, no account " << toBase58(accountId)
3659 << ", no need to add AccountHistory job.";
3660 return;
3661 }
3662 if (accountId == genesisAccountId)
3663 {
3664 if (auto const sleAcct = ledger->read(accountKeylet); sleAcct)
3665 {
3666 if (sleAcct->getFieldU32(sfSequence) == 1)
3667 {
3668 JLOG(m_journal.debug()) << "subAccountHistoryStart, genesis account " << toBase58(accountId)
3669 << " does not have tx, no need to add AccountHistory job.";
3670 return;
3671 }
3672 }
3673 else
3674 {
3675 // LCOV_EXCL_START
3676 UNREACHABLE(
3677 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
3678 "access genesis account");
3679 return;
3680 // LCOV_EXCL_STOP
3681 }
3682 }
3683 subInfo.index_->historyLastLedgerSeq_ = ledger->seq();
3684 subInfo.index_->haveHistorical_ = true;
3685
3686 JLOG(m_journal.debug()) << "subAccountHistoryStart, add AccountHistory job: accountId=" << toBase58(accountId)
3687 << ", currentLedgerSeq=" << ledger->seq();
3688
3689 addAccountHistoryJob(subInfo);
3690}
3691
3694{
3695 if (!isrListener->insertSubAccountHistory(accountId))
3696 {
3697 JLOG(m_journal.debug()) << "subAccountHistory, already subscribed to account " << toBase58(accountId);
3698 return rpcINVALID_PARAMS;
3699 }
3700
3703 auto simIterator = mSubAccountHistory.find(accountId);
3704 if (simIterator == mSubAccountHistory.end())
3705 {
3707 inner.emplace(isrListener->getSeq(), ahi);
3708 mSubAccountHistory.insert(simIterator, std::make_pair(accountId, inner));
3709 }
3710 else
3711 {
3712 simIterator->second.emplace(isrListener->getSeq(), ahi);
3713 }
3714
3715 auto const ledger = registry_.getLedgerMaster().getValidatedLedger();
3716 if (ledger)
3717 {
3718 subAccountHistoryStart(ledger, ahi);
3719 }
3720 else
3721 {
3722 // The node does not have validated ledgers, so wait for
3723 // one before start streaming.
3724 // In this case, the subscription is also considered successful.
3725 JLOG(m_journal.debug()) << "subAccountHistory, no validated ledger yet, delay start";
3726 }
3727
3728 return rpcSUCCESS;
3729}
3730
3731void
3732NetworkOPsImp::unsubAccountHistory(InfoSub::ref isrListener, AccountID const& account, bool historyOnly)
3733{
3734 if (!historyOnly)
3735 isrListener->deleteSubAccountHistory(account);
3736 unsubAccountHistoryInternal(isrListener->getSeq(), account, historyOnly);
3737}
3738
3739void
3741{
3743 auto simIterator = mSubAccountHistory.find(account);
3744 if (simIterator != mSubAccountHistory.end())
3745 {
3746 auto& subInfoMap = simIterator->second;
3747 auto subInfoIter = subInfoMap.find(seq);
3748 if (subInfoIter != subInfoMap.end())
3749 {
3750 subInfoIter->second.index_->stopHistorical_ = true;
3751 }
3752
3753 if (!historyOnly)
3754 {
3755 simIterator->second.erase(seq);
3756 if (simIterator->second.empty())
3757 {
3758 mSubAccountHistory.erase(simIterator);
3759 }
3760 }
3761 JLOG(m_journal.debug()) << "unsubAccountHistory, account " << toBase58(account)
3762 << ", historyOnly = " << (historyOnly ? "true" : "false");
3763 }
3764}
3765
3766bool
3768{
3769 if (auto listeners = registry_.getOrderBookDB().makeBookListeners(book))
3770 listeners->addSubscriber(isrListener);
3771 else
3772 {
3773 // LCOV_EXCL_START
3774 UNREACHABLE("xrpl::NetworkOPsImp::subBook : null book listeners");
3775 // LCOV_EXCL_STOP
3776 }
3777 return true;
3778}
3779
3780bool
3782{
3783 if (auto listeners = registry_.getOrderBookDB().getBookListeners(book))
3784 listeners->removeSubscriber(uSeq);
3785
3786 return true;
3787}
3788
3791{
3792 // This code-path is exclusively used when the server is in standalone
3793 // mode via `ledger_accept`
3794 XRPL_ASSERT(m_standalone, "xrpl::NetworkOPsImp::acceptLedger : is standalone");
3795
3796 if (!m_standalone)
3797 Throw<std::runtime_error>("Operation only possible in STANDALONE mode.");
3798
3799 // FIXME Could we improve on this and remove the need for a specialized
3800 // API in Consensus?
3801 beginConsensus(m_ledgerMaster.getClosedLedger()->header().hash, {});
3802 mConsensus.simulate(registry_.timeKeeper().closeTime(), consensusDelay);
3803 return m_ledgerMaster.getCurrentLedger()->header().seq;
3804}
3805
3806// <-- bool: true=added, false=already there
3807bool
3809{
3810 if (auto lpClosed = m_ledgerMaster.getValidatedLedger())
3811 {
3812 jvResult[jss::ledger_index] = lpClosed->header().seq;
3813 jvResult[jss::ledger_hash] = to_string(lpClosed->header().hash);
3814 jvResult[jss::ledger_time] = Json::Value::UInt(lpClosed->header().closeTime.time_since_epoch().count());
3815 if (!lpClosed->rules().enabled(featureXRPFees))
3816 jvResult[jss::fee_ref] = Config::FEE_UNITS_DEPRECATED;
3817 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
3818 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
3819 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
3820 jvResult[jss::network_id] = registry_.app().config().NETWORK_ID;
3821 }
3822
3824 {
3825 jvResult[jss::validated_ledgers] = registry_.getLedgerMaster().getCompleteLedgers();
3826 }
3827
3829 return mStreamMaps[sLedger].emplace(isrListener->getSeq(), isrListener).second;
3830}
3831
3832// <-- bool: true=added, false=already there
3833bool
3835{
3837 return mStreamMaps[sBookChanges].emplace(isrListener->getSeq(), isrListener).second;
3838}
3839
3840// <-- bool: true=erased, false=was not there
3841bool
3843{
3845 return mStreamMaps[sLedger].erase(uSeq);
3846}
3847
3848// <-- bool: true=erased, false=was not there
3849bool
3855
3856// <-- bool: true=added, false=already there
3857bool
3859{
3861 return mStreamMaps[sManifests].emplace(isrListener->getSeq(), isrListener).second;
3862}
3863
3864// <-- bool: true=erased, false=was not there
3865bool
3871
3872// <-- bool: true=added, false=already there
3873bool
3874NetworkOPsImp::subServer(InfoSub::ref isrListener, Json::Value& jvResult, bool admin)
3875{
3876 uint256 uRandom;
3877
3878 if (m_standalone)
3879 jvResult[jss::stand_alone] = m_standalone;
3880
3881 // CHECKME: is it necessary to provide a random number here?
3882 beast::rngfill(uRandom.begin(), uRandom.size(), crypto_prng());
3883
3884 auto const& feeTrack = registry_.getFeeTrack();
3885 jvResult[jss::random] = to_string(uRandom);
3886 jvResult[jss::server_status] = strOperatingMode(admin);
3887 jvResult[jss::load_base] = feeTrack.getLoadBase();
3888 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
3889 jvResult[jss::hostid] = getHostId(admin);
3890 jvResult[jss::pubkey_node] = toBase58(TokenType::NodePublic, registry_.app().nodeIdentity().first);
3891
3893 return mStreamMaps[sServer].emplace(isrListener->getSeq(), isrListener).second;
3894}
3895
3896// <-- bool: true=erased, false=was not there
3897bool
3899{
3901 return mStreamMaps[sServer].erase(uSeq);
3902}
3903
3904// <-- bool: true=added, false=already there
3905bool
3907{
3909 return mStreamMaps[sTransactions].emplace(isrListener->getSeq(), isrListener).second;
3910}
3911
3912// <-- bool: true=erased, false=was not there
3913bool
3919
3920// <-- bool: true=added, false=already there
3921bool
3923{
3925 return mStreamMaps[sRTTransactions].emplace(isrListener->getSeq(), isrListener).second;
3926}
3927
3928// <-- bool: true=erased, false=was not there
3929bool
3935
3936// <-- bool: true=added, false=already there
3937bool
3939{
3941 return mStreamMaps[sValidations].emplace(isrListener->getSeq(), isrListener).second;
3942}
3943
3944void
3949
3950// <-- bool: true=erased, false=was not there
3951bool
3957
3958// <-- bool: true=added, false=already there
3959bool
3961{
3963 return mStreamMaps[sPeerStatus].emplace(isrListener->getSeq(), isrListener).second;
3964}
3965
3966// <-- bool: true=erased, false=was not there
3967bool
3973
3974// <-- bool: true=added, false=already there
3975bool
3977{
3979 return mStreamMaps[sConsensusPhase].emplace(isrListener->getSeq(), isrListener).second;
3980}
3981
3982// <-- bool: true=erased, false=was not there
3983bool
3989
3992{
3994
3995 subRpcMapType::iterator it = mRpcSubMap.find(strUrl);
3996
3997 if (it != mRpcSubMap.end())
3998 return it->second;
3999
4000 return InfoSub::pointer();
4001}
4002
4005{
4007
4008 mRpcSubMap.emplace(strUrl, rspEntry);
4009
4010 return rspEntry;
4011}
4012
4013bool
4015{
4017 auto pInfo = findRpcSub(strUrl);
4018
4019 if (!pInfo)
4020 return false;
4021
4022 // check to see if any of the stream maps still hold a weak reference to
4023 // this entry before removing
4024 for (SubMapType const& map : mStreamMaps)
4025 {
4026 if (map.find(pInfo->getSeq()) != map.end())
4027 return false;
4028 }
4029 mRpcSubMap.erase(strUrl);
4030 return true;
4031}
4032
4033#ifndef USE_NEW_BOOK_PAGE
4034
4035// NIKB FIXME this should be looked at. There's no reason why this shouldn't
4036// work, but it demonstrated poor performance.
4037//
4038void
4041 Book const& book,
4042 AccountID const& uTakerID,
4043 bool const bProof,
4044 unsigned int iLimit,
4045 Json::Value const& jvMarker,
4046 Json::Value& jvResult)
4047{ // CAUTION: This is the old get book page logic
4048 Json::Value& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4049
4051 uint256 const uBookBase = getBookBase(book);
4052 uint256 const uBookEnd = getQualityNext(uBookBase);
4053 uint256 uTipIndex = uBookBase;
4054
4055 if (auto stream = m_journal.trace())
4056 {
4057 stream << "getBookPage:" << book;
4058 stream << "getBookPage: uBookBase=" << uBookBase;
4059 stream << "getBookPage: uBookEnd=" << uBookEnd;
4060 stream << "getBookPage: uTipIndex=" << uTipIndex;
4061 }
4062
4063 ReadView const& view = *lpLedger;
4064
4065 bool const bGlobalFreeze = isGlobalFrozen(view, book.out.account) || isGlobalFrozen(view, book.in.account);
4066
4067 bool bDone = false;
4068 bool bDirectAdvance = true;
4069
4070 std::shared_ptr<SLE const> sleOfferDir;
4071 uint256 offerIndex;
4072 unsigned int uBookEntry;
4073 STAmount saDirRate;
4074
4075 auto const rate = transferRate(view, book.out.account);
4076 auto viewJ = registry_.journal("View");
4077
4078 while (!bDone && iLimit-- > 0)
4079 {
4080 if (bDirectAdvance)
4081 {
4082 bDirectAdvance = false;
4083
4084 JLOG(m_journal.trace()) << "getBookPage: bDirectAdvance";
4085
4086 auto const ledgerIndex = view.succ(uTipIndex, uBookEnd);
4087 if (ledgerIndex)
4088 sleOfferDir = view.read(keylet::page(*ledgerIndex));
4089 else
4090 sleOfferDir.reset();
4091
4092 if (!sleOfferDir)
4093 {
4094 JLOG(m_journal.trace()) << "getBookPage: bDone";
4095 bDone = true;
4096 }
4097 else
4098 {
4099 uTipIndex = sleOfferDir->key();
4100 saDirRate = amountFromQuality(getQuality(uTipIndex));
4101
4102 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4103
4104 JLOG(m_journal.trace()) << "getBookPage: uTipIndex=" << uTipIndex;
4105 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4106 }
4107 }
4108
4109 if (!bDone)
4110 {
4111 auto sleOffer = view.read(keylet::offer(offerIndex));
4112
4113 if (sleOffer)
4114 {
4115 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4116 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4117 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4118 STAmount saOwnerFunds;
4119 bool firstOwnerOffer(true);
4120
4121 if (book.out.account == uOfferOwnerID)
4122 {
4123 // If an offer is selling issuer's own IOUs, it is fully
4124 // funded.
4125 saOwnerFunds = saTakerGets;
4126 }
4127 else if (bGlobalFreeze)
4128 {
4129 // If either asset is globally frozen, consider all offers
4130 // that aren't ours to be totally unfunded
4131 saOwnerFunds.clear(book.out);
4132 }
4133 else
4134 {
4135 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4136 if (umBalanceEntry != umBalance.end())
4137 {
4138 // Found in running balance table.
4139
4140 saOwnerFunds = umBalanceEntry->second;
4141 firstOwnerOffer = false;
4142 }
4143 else
4144 {
4145 // Did not find balance in table.
4146
4147 saOwnerFunds = accountHolds(
4148 view, uOfferOwnerID, book.out.currency, book.out.account, fhZERO_IF_FROZEN, viewJ);
4149
4150 if (saOwnerFunds < beast::zero)
4151 {
4152 // Treat negative funds as zero.
4153
4154 saOwnerFunds.clear();
4155 }
4156 }
4157 }
4158
4159 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4160
4161 STAmount saTakerGetsFunded;
4162 STAmount saOwnerFundsLimit = saOwnerFunds;
4163 Rate offerRate = parityRate;
4164
4165 if (rate != parityRate
4166 // Have a transfer fee.
4167 && uTakerID != book.out.account
4168 // Not taking offers of own IOUs.
4169 && book.out.account != uOfferOwnerID)
4170 // Offer owner not issuing ownfunds
4171 {
4172 // Need to charge a transfer fee to offer owner.
4173 offerRate = rate;
4174 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4175 }
4176
4177 if (saOwnerFundsLimit >= saTakerGets)
4178 {
4179 // Sufficient funds no shenanigans.
4180 saTakerGetsFunded = saTakerGets;
4181 }
4182 else
4183 {
4184 // Only provide, if not fully funded.
4185
4186 saTakerGetsFunded = saOwnerFundsLimit;
4187
4188 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4189 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4190 .setJson(jvOffer[jss::taker_pays_funded]);
4191 }
4192
4193 STAmount saOwnerPays = (parityRate == offerRate)
4194 ? saTakerGetsFunded
4195 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4196
4197 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4198
4199 // Include all offers funded and unfunded
4200 Json::Value& jvOf = jvOffers.append(jvOffer);
4201 jvOf[jss::quality] = saDirRate.getText();
4202
4203 if (firstOwnerOffer)
4204 jvOf[jss::owner_funds] = saOwnerFunds.getText();
4205 }
4206 else
4207 {
4208 JLOG(m_journal.warn()) << "Missing offer";
4209 }
4210
4211 if (!cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4212 {
4213 bDirectAdvance = true;
4214 }
4215 else
4216 {
4217 JLOG(m_journal.trace()) << "getBookPage: offerIndex=" << offerIndex;
4218 }
4219 }
4220 }
4221
4222 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4223 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4224}
4225
4226#else
4227
4228// This is the new code that uses the book iterators
4229// It has temporarily been disabled
4230
4231void
4234 Book const& book,
4235 AccountID const& uTakerID,
4236 bool const bProof,
4237 unsigned int iLimit,
4238 Json::Value const& jvMarker,
4239 Json::Value& jvResult)
4240{
4241 auto& jvOffers = (jvResult[jss::offers] = Json::Value(Json::arrayValue));
4242
4244
4245 MetaView lesActive(lpLedger, tapNONE, true);
4246 OrderBookIterator obIterator(lesActive, book);
4247
4248 auto const rate = transferRate(lesActive, book.out.account);
4249
4250 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.out.account) || lesActive.isGlobalFrozen(book.in.account);
4251
4252 while (iLimit-- > 0 && obIterator.nextOffer())
4253 {
4254 SLE::pointer sleOffer = obIterator.getCurrentOffer();
4255 if (sleOffer)
4256 {
4257 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4258 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4259 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4260 STAmount saDirRate = obIterator.getCurrentRate();
4261 STAmount saOwnerFunds;
4262
4263 if (book.out.account == uOfferOwnerID)
4264 {
4265 // If offer is selling issuer's own IOUs, it is fully funded.
4266 saOwnerFunds = saTakerGets;
4267 }
4268 else if (bGlobalFreeze)
4269 {
4270 // If either asset is globally frozen, consider all offers
4271 // that aren't ours to be totally unfunded
4272 saOwnerFunds.clear(book.out);
4273 }
4274 else
4275 {
4276 auto umBalanceEntry = umBalance.find(uOfferOwnerID);
4277
4278 if (umBalanceEntry != umBalance.end())
4279 {
4280 // Found in running balance table.
4281
4282 saOwnerFunds = umBalanceEntry->second;
4283 }
4284 else
4285 {
4286 // Did not find balance in table.
4287
4288 saOwnerFunds =
4289 lesActive.accountHolds(uOfferOwnerID, book.out.currency, book.out.account, fhZERO_IF_FROZEN);
4290
4291 if (saOwnerFunds.isNegative())
4292 {
4293 // Treat negative funds as zero.
4294
4295 saOwnerFunds.zero();
4296 }
4297 }
4298 }
4299
4300 Json::Value jvOffer = sleOffer->getJson(JsonOptions::none);
4301
4302 STAmount saTakerGetsFunded;
4303 STAmount saOwnerFundsLimit = saOwnerFunds;
4304 Rate offerRate = parityRate;
4305
4306 if (rate != parityRate
4307 // Have a transfer fee.
4308 && uTakerID != book.out.account
4309 // Not taking offers of own IOUs.
4310 && book.out.account != uOfferOwnerID)
4311 // Offer owner not issuing ownfunds
4312 {
4313 // Need to charge a transfer fee to offer owner.
4314 offerRate = rate;
4315 saOwnerFundsLimit = divide(saOwnerFunds, offerRate);
4316 }
4317
4318 if (saOwnerFundsLimit >= saTakerGets)
4319 {
4320 // Sufficient funds no shenanigans.
4321 saTakerGetsFunded = saTakerGets;
4322 }
4323 else
4324 {
4325 // Only provide, if not fully funded.
4326 saTakerGetsFunded = saOwnerFundsLimit;
4327
4328 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4329
4330 // TODO(tom): The result of this expression is not used - what's
4331 // going on here?
4332 std::min(saTakerPays, multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4333 .setJson(jvOffer[jss::taker_pays_funded]);
4334 }
4335
4336 STAmount saOwnerPays = (parityRate == offerRate)
4337 ? saTakerGetsFunded
4338 : std::min(saOwnerFunds, multiply(saTakerGetsFunded, offerRate));
4339
4340 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4341
4342 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4343 {
4344 // Only provide funded offers and offers of the taker.
4345 Json::Value& jvOf = jvOffers.append(jvOffer);
4346 jvOf[jss::quality] = saDirRate.getText();
4347 }
4348 }
4349 }
4350
4351 // jvResult[jss::marker] = Json::Value(Json::arrayValue);
4352 // jvResult[jss::nodes] = Json::Value(Json::arrayValue);
4353}
4354
4355#endif
4356
4357inline void
4359{
4360 auto [counters, mode, start, initialSync] = accounting_.getCounterData();
4361 auto const current =
4362 std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - start);
4363 counters[static_cast<std::size_t>(mode)].dur += current;
4364
4366 m_stats.disconnected_duration.set(counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].dur.count());
4367 m_stats.connected_duration.set(counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].dur.count());
4368 m_stats.syncing_duration.set(counters[static_cast<std::size_t>(OperatingMode::SYNCING)].dur.count());
4369 m_stats.tracking_duration.set(counters[static_cast<std::size_t>(OperatingMode::TRACKING)].dur.count());
4370 m_stats.full_duration.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].dur.count());
4371
4372 m_stats.disconnected_transitions.set(counters[static_cast<std::size_t>(OperatingMode::DISCONNECTED)].transitions);
4373 m_stats.connected_transitions.set(counters[static_cast<std::size_t>(OperatingMode::CONNECTED)].transitions);
4374 m_stats.syncing_transitions.set(counters[static_cast<std::size_t>(OperatingMode::SYNCING)].transitions);
4375 m_stats.tracking_transitions.set(counters[static_cast<std::size_t>(OperatingMode::TRACKING)].transitions);
4376 m_stats.full_transitions.set(counters[static_cast<std::size_t>(OperatingMode::FULL)].transitions);
4377}
4378
4379void
4381{
4382 auto now = std::chrono::steady_clock::now();
4383
4384 std::lock_guard lock(mutex_);
4385 ++counters_[static_cast<std::size_t>(om)].transitions;
4386 if (om == OperatingMode::FULL && counters_[static_cast<std::size_t>(om)].transitions == 1)
4387 {
4388 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(now - processStart_).count();
4389 }
4390 counters_[static_cast<std::size_t>(mode_)].dur +=
4391 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4392
4393 mode_ = om;
4394 start_ = now;
4395}
4396
4397void
4399{
4400 auto [counters, mode, start, initialSync] = getCounterData();
4401 auto const current =
4402 std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - start);
4403 counters[static_cast<std::size_t>(mode)].dur += current;
4404
4405 obj[jss::state_accounting] = Json::objectValue;
4407 i <= static_cast<std::size_t>(OperatingMode::FULL);
4408 ++i)
4409 {
4410 obj[jss::state_accounting][states_[i]] = Json::objectValue;
4411 auto& state = obj[jss::state_accounting][states_[i]];
4412 state[jss::transitions] = std::to_string(counters[i].transitions);
4413 state[jss::duration_us] = std::to_string(counters[i].dur.count());
4414 }
4415 obj[jss::server_state_duration_us] = std::to_string(current.count());
4416 if (initialSync)
4417 obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
4418}
4419
4420//------------------------------------------------------------------------------
4421
4424 ServiceRegistry& registry,
4426 bool standalone,
4427 std::size_t minPeerCount,
4428 bool startvalid,
4429 JobQueue& job_queue,
4431 ValidatorKeys const& validatorKeys,
4432 boost::asio::io_context& io_svc,
4433 beast::Journal journal,
4434 beast::insight::Collector::ptr const& collector)
4435{
4437 registry,
4438 clock,
4439 standalone,
4440 minPeerCount,
4441 startvalid,
4442 job_queue,
4444 validatorKeys,
4445 io_svc,
4446 journal,
4447 collector);
4448}
4449
4450} // namespace xrpl
T any_of(T... args)
T back_inserter(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Definition json_value.h:44
Represents a JSON value.
Definition json_value.h:130
Json::UInt UInt
Definition json_value.h:137
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:318
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream trace() const
Severity stream access functions.
Definition Journal.h:294
Stream warn() const
Definition Journal.h:312
A metric for measuring an integral value.
Definition Gauge.h:20
void set(value_type value) const
Set the value on the gauge.
Definition Gauge.h:48
A reference to a handler for performing polled collection.
Definition Hook.h:12
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual std::chrono::milliseconds getIOLatency()=0
virtual Config & config()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Specifies an order book.
Definition Book.h:16
Issue in
Definition Book.h:18
Issue out
Definition Book.h:19
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
Definition ClusterNode.h:32
NetClock::time_point getReportTime() const
Definition ClusterNode.h:38
PublicKey const & identity() const
Definition ClusterNode.h:44
std::string const & name() const
Definition ClusterNode.h:26
std::size_t size() const
The number of nodes in the cluster list.
Definition Cluster.cpp:30
uint32_t NETWORK_ID
Definition Config.h:138
std::string SERVER_DOMAIN
Definition Config.h:259
int RELAY_UNTRUSTED_VALIDATIONS
Definition Config.h:151
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
Definition Config.h:142
std::size_t NODE_SIZE
Definition Config.h:194
virtual Json::Value getInfo()=0
virtual void clearFailures()=0
std::shared_ptr< InfoSub > pointer
Definition InfoSub.h:33
Currency currency
Definition Issue.h:15
AccountID account
Definition Issue.h:16
A pool of threads to perform work.
Definition JobQueue.h:37
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:145
Json::Value getJson(int c=0)
Definition JobQueue.cpp:172
std::chrono::seconds getValidatedLedgerAge()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
bool haveValidated()
Whether we have ever fully validated a ledger.
std::size_t getFetchPackCacheSize() const
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::shared_ptr< Ledger const > getValidatedLedger()
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< ReadView const > getCurrentLedger()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
std::uint32_t getLoadBase() const
Manages load sources.
Definition LoadManager.h:26
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
Definition Manifest.cpp:284
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(Json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< Json::StaticString const, 5 > const states_
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
ServerFeeSummary mLastFeeSummary
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
DispatchState mDispatchState
std::size_t const minPeerCount_
beast::Journal m_journal
static std::array< char const *, 5 > const states_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(Json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
void stop() override
SubInfoMapType mSubRTAccount
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
bool unsubRTTransactions(std::uint64_t uListener) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
bool processTrustedProposal(RCLCxPeerPos proposal) override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
std::atomic< OperatingMode > mMode
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
bool const m_standalone
std::recursive_mutex mSubLock
bool isNeedNetworkLedger() override
DispatchState
Synchronization states for transaction batches.
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
bool subConsensus(InfoSub::ref ispListener) override
bool unsubBook(std::uint64_t uListener, Book const &) override
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
bool isFull() override
void clearAmendmentWarned() override
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
std::unique_ptr< LocalTxs > m_localTX
void setStandAlone() override
void setNeedNetworkLedger() override
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
bool unsubServer(std::uint64_t uListener) override
SubAccountHistoryMapType mSubAccountHistory
bool unsubConsensus(std::uint64_t uListener) override
std::condition_variable mCond
void pubManifest(Manifest const &) override
RCLConsensus mConsensus
void consensusViewChange() override
ServiceRegistry & registry_
boost::asio::steady_timer accountHistoryTxTimer_
Json::Value getConsensusInfo() override
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
ConsensusPhase mLastConsensusPhase
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
std::vector< TransactionStatus > mTransactions
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
LedgerMaster & m_ledgerMaster
Json::Value getServerInfo(bool human, bool admin, bool counters) override
StateAccounting accounting_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
subRpcMapType mRpcSubMap
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex m_statsMutex
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
SubInfoMapType mSubAccount
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
boost::asio::steady_timer clusterTimer_
NetworkOPsImp(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
bool unsubPeerStatus(std::uint64_t uListener) override
void reportFeeChange() override
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isBlocked() override
~NetworkOPsImp() override
Json::Value getLedgerFetchInfo() override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
Definition NetworkOPs.h:71
void getCountsJson(Json::Value &obj)
Definition Database.cpp:225
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
Definition OpenView.h:45
virtual void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)=0
virtual BookListeners::pointer makeBookListeners(Book const &)=0
virtual BookListeners::pointer getBookListeners(Book const &)=0
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnectCharges() const =0
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Definition RCLCxTx.h:43
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Definition RFC1751.cpp:396
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
A view into a ledger.
Definition ReadView.h:31
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
Issue const & issue() const
Definition STAmount.h:454
std::string getText() const override
Definition STAmount.cpp:639
void setJson(Json::Value &) const
Definition STAmount.cpp:599
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
Definition Serializer.h:50
void const * data() const noexcept
Definition Serializer.h:56
void setup(Setup const &setup, beast::Journal journal)
Service registry for dependency injection.
virtual perf::PerfLog & getPerfLog()=0
virtual JobQueue & getJobQueue()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual ValidatorList & validators()=0
virtual TxQ & getTxQ()=0
virtual Overlay & overlay()=0
virtual NodeStore::Database & getNodeStore()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual ServerHandler & getServerHandler()=0
virtual OpenLedger & openLedger()=0
virtual Cluster & cluster()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual TimeKeeper & timeKeeper()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual ManifestCache & validatorManifests()=0
virtual beast::Journal journal(std::string const &name)=0
virtual Application & app()=0
time_point now() const override
Returns the current time, using the server's clock.
Definition TimeKeeper.h:43
std::chrono::seconds closeOffset() const
Definition TimeKeeper.h:62
time_point closeTime() const
Returns the predicted close time, in network time.
Definition TimeKeeper.h:55
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Definition TxQ.cpp:1603
static time_point now()
Validator keys and manifest as set in configuration file.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::size_t quorum() const
Get quorum value for current trusted key set.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
Json::Value jsonClipped() const
Definition XRPAmount.h:196
constexpr double decimalXRP() const
Definition XRPAmount.h:240
iterator begin()
Definition base_uint.h:112
bool isZero() const
Definition base_uint.h:508
static constexpr std::size_t size()
Definition base_uint.h:494
bool isNonZero() const
Definition base_uint.h:513
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
Definition scope.h:197
T clear(T... args)
T emplace_back(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T get(T... args)
T insert(T... args)
T is_same_v
T is_sorted(T... args)
T lock(T... args)
T make_pair(T... args)
T max(T... args)
T min(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
int Int
unsigned int UInt
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
Definition rngfill.h:14
STL namespace.
std::string const & getVersionString()
Server version.
Definition BuildInfo.cpp:51
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Definition CTID.h:33
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
Definition BookChanges.h:27
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Definition Indexes.cpp:235
Keylet account(AccountID const &id) noexcept
AccountID root.
Definition Indexes.cpp:160
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Definition Indexes.cpp:331
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Definition escrow.cpp:50
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
STAmount divide(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:69
@ terQUEUED
Definition TER.h:205
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isTerRetry(TER x) noexcept
Definition TER.h:643
@ fhZERO_IF_FROZEN
Definition View.h:58
@ fhIGNORE_FREEZE
Definition View.h:58
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
@ INVALID
Definition Transaction.h:29
@ OBSOLETE
Definition Transaction.h:35
@ INCLUDED
Definition Transaction.h:30
constexpr std::uint32_t tfInnerBatchTxn
Definition TxFlags.h:41
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:597
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:10
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const &current)
Definition ReadView.cpp:50
@ tefPAST_SEQ
Definition TER.h:155
std::uint64_t getQuality(uint256 const &uBase)
Definition Indexes.cpp:126
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
FeeSetup setup_FeeVote(Section const &section)
Definition Config.cpp:1024
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const &currency, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j, SpendableHandling includeFullBalance=shSIMPLE_BALANCE)
Definition View.cpp:392
Number root(Number f, unsigned d)
Definition Number.cpp:938
bool transResultInfo(TER code, std::string &token, std::string &text)
Definition TER.cpp:228
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
Definition View.cpp:112
STAmount multiply(STAmount const &amount, Rate const &rate)
Definition Rate2.cpp:34
static auto const genesisAccountId
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
Definition Seed.cpp:57
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
Definition View.cpp:101
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
std::unique_ptr< NetworkOPs > make_NetworkOPs(ServiceRegistry &registry, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
@ current
This was a new validation and was added.
constexpr std::size_t maxPoppedTransactions
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
Definition View.cpp:515
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
Definition View.cpp:138
STAmount amountFromQuality(std::uint64_t rate)
Definition STAmount.cpp:927
@ jtNETOP_CLUSTER
Definition Job.h:54
@ jtCLIENT_CONSENSUS
Definition Job.h:27
@ jtTXN_PROC
Definition Job.h:61
@ jtCLIENT_ACCT_HIST
Definition Job.h:28
@ jtTRANSACTION
Definition Job.h:41
@ jtCLIENT_FEE_CHANGE
Definition Job.h:26
@ jtBATCH
Definition Job.h:44
bool isTefFailure(TER x) noexcept
Definition TER.h:637
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
Definition View.cpp:699
auto constexpr muldiv_max
Definition mulDiv.h:8
uint256 getQualityNext(uint256 const &uBase)
Definition Indexes.cpp:119
ConsensusPhase
Phases of consensus for a single ledger round.
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
Definition predicates.h:54
void forAllApiVersions(Fn const &fn, Args &&... args)
Definition ApiVersion.h:144
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Definition Indexes.cpp:98
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
std::string to_string_iso(date::sys_time< Duration > tp)
Definition chrono.h:67
std::unique_ptr< LocalTxs > make_LocalTxs()
Definition LocalTxs.cpp:170
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
Definition apply.cpp:22
ApplyFlags
Definition ApplyView.h:10
@ tapNONE
Definition ApplyView.h:11
@ tapFAIL_HARD
Definition ApplyView.h:15
@ tapUNLIMITED
Definition ApplyView.h:22
bool isTelLocal(TER x) noexcept
Definition TER.h:625
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
@ temINVALID_FLAG
Definition TER.h:91
@ temBAD_SIGNATURE
Definition TER.h:85
bool isTesSuccess(TER x) noexcept
Definition TER.h:649
static std::uint32_t trunc32(std::uint64_t v)
static std::array< char const *, 5 > const stateNames
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:50
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
Definition STTx.cpp:767
bool isTemMalformed(TER x) noexcept
Definition TER.h:631
@ tesSUCCESS
Definition TER.h:225
error_code_i
Definition ErrorCodes.h:20
@ rpcINTERNAL
Definition ErrorCodes.h:110
@ rpcINVALID_PARAMS
Definition ErrorCodes.h:64
@ rpcSUCCESS
Definition ErrorCodes.h:24
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
@ warnRPC_AMENDMENT_BLOCKED
Definition ErrorCodes.h:153
@ warnRPC_UNSUPPORTED_MAJORITY
Definition ErrorCodes.h:152
@ warnRPC_EXPIRED_VALIDATOR_LIST
Definition ErrorCodes.h:154
T owns_lock(T... args)
T ref(T... args)
T reserve(T... args)
T reset(T... args)
T set_intersection(T... args)
T size(T... args)
T str(T... args)
PublicKey masterKey
The master key associated with this manifest.
Definition Manifest.h:66
std::string serialized
The manifest in serialized form.
Definition Manifest.h:63
Blob getMasterSignature() const
Returns manifest master key signature.
Definition Manifest.cpp:220
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
Definition Manifest.h:78
std::optional< Blob > getSignature() const
Returns manifest signature.
Definition Manifest.cpp:209
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
Definition Manifest.h:72
std::uint32_t sequence
The sequence number of this manifest.
Definition Manifest.h:75
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge connected_transitions
beast::insight::Gauge full_transitions
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge full_duration
beast::insight::Hook hook
SubAccountHistoryIndex(AccountID const &accountId)
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Definition Rate.h:20
Data format for exchanging consumption information across peers.
Definition Gossip.h:12
std::vector< Item > items
Definition Gossip.h:24
Changes in trusted nodes after updating validator list.
hash_set< NodeID > added
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
Definition TxQ.h:145
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
Select all peers (except optional excluded) that are in our cluster.
Definition predicates.h:115
Sends a message to all peers.
Definition predicates.h:12
T swap(T... args)
T time_since_epoch(T... args)
T to_string(T... args)
T unlock(T... args)
T value_or(T... args)
T what(T... args)