1#include <xrpld/app/consensus/RCLConsensus.h>
2#include <xrpld/app/consensus/RCLValidations.h>
3#include <xrpld/app/ledger/AcceptedLedger.h>
4#include <xrpld/app/ledger/InboundLedgers.h>
5#include <xrpld/app/ledger/LedgerMaster.h>
6#include <xrpld/app/ledger/LedgerToJson.h>
7#include <xrpld/app/ledger/LocalTxs.h>
8#include <xrpld/app/ledger/OpenLedger.h>
9#include <xrpld/app/ledger/OrderBookDB.h>
10#include <xrpld/app/ledger/TransactionMaster.h>
11#include <xrpld/app/main/LoadManager.h>
12#include <xrpld/app/main/Tuning.h>
13#include <xrpld/app/misc/AmendmentTable.h>
14#include <xrpld/app/misc/DeliverMax.h>
15#include <xrpld/app/misc/HashRouter.h>
16#include <xrpld/app/misc/LoadFeeTrack.h>
17#include <xrpld/app/misc/NetworkOPs.h>
18#include <xrpld/app/misc/Transaction.h>
19#include <xrpld/app/misc/TxQ.h>
20#include <xrpld/app/misc/ValidatorKeys.h>
21#include <xrpld/app/misc/ValidatorList.h>
22#include <xrpld/app/misc/detail/AccountTxPaging.h>
23#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
24#include <xrpld/app/tx/apply.h>
25#include <xrpld/consensus/Consensus.h>
26#include <xrpld/consensus/ConsensusParms.h>
27#include <xrpld/overlay/Cluster.h>
28#include <xrpld/overlay/Overlay.h>
29#include <xrpld/overlay/predicates.h>
30#include <xrpld/rpc/BookChanges.h>
31#include <xrpld/rpc/CTID.h>
32#include <xrpld/rpc/DeliveredAmount.h>
33#include <xrpld/rpc/MPTokenIssuanceID.h>
34#include <xrpld/rpc/ServerHandler.h>
36#include <xrpl/basics/UptimeClock.h>
37#include <xrpl/basics/mulDiv.h>
38#include <xrpl/basics/safe_cast.h>
39#include <xrpl/basics/scope.h>
40#include <xrpl/beast/utility/rngfill.h>
41#include <xrpl/core/PerfLog.h>
42#include <xrpl/crypto/RFC1751.h>
43#include <xrpl/crypto/csprng.h>
44#include <xrpl/protocol/BuildInfo.h>
45#include <xrpl/protocol/Feature.h>
46#include <xrpl/protocol/MultiApiJson.h>
47#include <xrpl/protocol/NFTSyntheticSerializer.h>
48#include <xrpl/protocol/RPCErr.h>
49#include <xrpl/protocol/TxFlags.h>
50#include <xrpl/protocol/jss.h>
51#include <xrpl/resource/Fees.h>
52#include <xrpl/resource/ResourceManager.h>
54#include <boost/asio/ip/host_name.hpp>
55#include <boost/asio/steady_timer.hpp>
94 "xrpl::NetworkOPsImp::TransactionStatus::TransactionStatus : "
137 std::chrono::steady_clock::time_point
start_ =
198 return !(*
this != b);
217 boost::asio::io_context& io_svc,
231 app_.logs().journal(
"FeeVote")),
234 app.getInboundTransactions(),
235 beast::get_abstract_clock<
std::chrono::steady_clock>(),
237 app_.logs().journal(
"LedgerConsensus"))
239 validatorKeys.keys ? validatorKeys.keys->publicKey
242 validatorKeys.keys ? validatorKeys.keys->masterPublicKey
437 getServerInfo(
bool human,
bool admin,
bool counters)
override;
464 TER result)
override;
498 bool historyOnly)
override;
504 bool historyOnly)
override;
576 catch (boost::system::system_error
const& e)
579 <<
"NetworkOPs: heartbeatTimer cancel error: " << e.what();
586 catch (boost::system::system_error
const& e)
589 <<
"NetworkOPs: clusterTimer cancel error: " << e.what();
596 catch (boost::system::system_error
const& e)
599 <<
"NetworkOPs: accountHistoryTxTimer cancel error: "
604 using namespace std::chrono_literals;
614 boost::asio::steady_timer& timer,
797 template <
class Handler>
799 Handler
const& handler,
801 :
hook(collector->make_hook(handler))
804 "Disconnected_duration"))
807 "Connected_duration"))
809 collector->make_gauge(
"State_Accounting",
"Syncing_duration"))
812 "Tracking_duration"))
814 collector->make_gauge(
"State_Accounting",
"Full_duration"))
817 "Disconnected_transitions"))
820 "Connected_transitions"))
823 "Syncing_transitions"))
826 "Tracking_transitions"))
828 collector->make_gauge(
"State_Accounting",
"Full_transitions"))
857 {
"disconnected",
"connected",
"syncing",
"tracking",
"full"}};
919 static std::string const hostname = boost::asio::ip::host_name();
926 static std::string const shroudedHostId = [
this]() {
932 return shroudedHostId;
947 boost::asio::steady_timer& timer,
954 [
this, onExpire, onError](boost::system::error_code
const& e) {
955 if ((e.value() == boost::system::errc::success) &&
956 (!m_job_queue.isStopped()))
961 if (e.value() != boost::system::errc::success &&
962 e.value() != boost::asio::error::operation_aborted)
965 JLOG(m_journal.error())
966 <<
"Timer got error '" << e.message()
967 <<
"'. Restarting timer.";
972 timer.expires_after(expiry_time);
973 timer.async_wait(std::move(*optionalCountedHandler));
978NetworkOPsImp::setHeartbeatTimer()
982 mConsensus.parms().ledgerGRANULARITY,
984 m_job_queue.addJob(jtNETOP_TIMER,
"NetOPs.heartbeat", [this]() {
985 processHeartbeatTimer();
988 [
this]() { setHeartbeatTimer(); });
992NetworkOPsImp::setClusterTimer()
994 using namespace std::chrono_literals;
1001 processClusterTimer();
1004 [
this]() { setClusterTimer(); });
1010 JLOG(m_journal.debug()) <<
"Scheduling AccountHistory job for account "
1012 using namespace std::chrono_literals;
1014 accountHistoryTxTimer_,
1016 [
this, subInfo]() { addAccountHistoryJob(subInfo); },
1017 [
this, subInfo]() { setAccountHistoryJobTimer(subInfo); });
1021NetworkOPsImp::processHeartbeatTimer()
1024 "Heartbeat Timer", mConsensus.validating(), m_journal);
1032 std::size_t const numPeers = app_.overlay().size();
1035 if (numPeers < minPeerCount_)
1037 if (mMode != OperatingMode::DISCONNECTED)
1039 setMode(OperatingMode::DISCONNECTED);
1041 ss <<
"Node count (" << numPeers <<
") has fallen "
1042 <<
"below required minimum (" << minPeerCount_ <<
").";
1043 JLOG(m_journal.warn()) << ss.
str();
1044 CLOG(clog.
ss()) <<
"set mode to DISCONNECTED: " << ss.
str();
1049 <<
"already DISCONNECTED. too few peers (" << numPeers
1050 <<
"), need at least " << minPeerCount_;
1057 setHeartbeatTimer();
1062 if (mMode == OperatingMode::DISCONNECTED)
1064 setMode(OperatingMode::CONNECTED);
1065 JLOG(m_journal.info())
1066 <<
"Node count (" << numPeers <<
") is sufficient.";
1067 CLOG(clog.
ss()) <<
"setting mode to CONNECTED based on " << numPeers
1073 auto origMode = mMode.load();
1074 CLOG(clog.
ss()) <<
"mode: " << strOperatingMode(origMode,
true);
1075 if (mMode == OperatingMode::SYNCING)
1076 setMode(OperatingMode::SYNCING);
1077 else if (mMode == OperatingMode::CONNECTED)
1078 setMode(OperatingMode::CONNECTED);
1079 auto newMode = mMode.load();
1080 if (origMode != newMode)
1083 <<
", changing to " << strOperatingMode(newMode,
true);
1085 CLOG(clog.
ss()) <<
". ";
1088 mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.
ss());
1090 CLOG(clog.
ss()) <<
"consensus phase " << to_string(mLastConsensusPhase);
1092 if (mLastConsensusPhase != currPhase)
1094 reportConsensusStateChange(currPhase);
1095 mLastConsensusPhase = currPhase;
1096 CLOG(clog.
ss()) <<
" changed to " << to_string(mLastConsensusPhase);
1098 CLOG(clog.
ss()) <<
". ";
1100 setHeartbeatTimer();
1104NetworkOPsImp::processClusterTimer()
1106 if (app_.cluster().size() == 0)
1109 using namespace std::chrono_literals;
1111 bool const update = app_.cluster().update(
1112 app_.nodeIdentity().first,
1114 (m_ledgerMaster.getValidatedLedgerAge() <= 4min)
1115 ? app_.getFeeTrack().getLocalFee()
1117 app_.timeKeeper().now());
1121 JLOG(m_journal.debug()) <<
"Too soon to send cluster update";
1126 protocol::TMCluster cluster;
1127 app_.cluster().for_each([&cluster](
ClusterNode const& node) {
1128 protocol::TMClusterNode& n = *cluster.add_clusternodes();
1133 n.set_nodename(node.
name());
1137 for (
auto& item : gossip.
items)
1139 protocol::TMLoadSource& node = *cluster.add_loadsources();
1140 node.set_name(to_string(item.address));
1141 node.set_cost(item.balance);
1143 app_.overlay().foreach(
send_if(
1155 if (mode == OperatingMode::FULL && admin)
1157 auto const consensusMode = mConsensus.mode();
1158 if (consensusMode != ConsensusMode::wrongLedger)
1160 if (consensusMode == ConsensusMode::proposing)
1163 if (mConsensus.validating())
1164 return "validating";
1174 if (isNeedNetworkLedger())
1182 m_ledgerMaster.getValidatedRules().enabled(featureBatch))
1184 JLOG(m_journal.error())
1185 <<
"Submitted transaction invalid: tfInnerBatchTxn flag present.";
1192 auto const txid = trans->getTransactionID();
1193 auto const flags = app_.getHashRouter().getFlags(txid);
1195 if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1197 JLOG(m_journal.warn()) <<
"Submitted transaction cached bad";
1204 app_.getHashRouter(),
1206 m_ledgerMaster.getValidatedRules(),
1209 if (validity != Validity::Valid)
1211 JLOG(m_journal.warn())
1212 <<
"Submitted transaction invalid: " << reason;
1218 JLOG(m_journal.warn())
1219 <<
"Exception checking transaction " << txid <<
": " << ex.
what();
1228 m_job_queue.addJob(
jtTRANSACTION,
"submitTxn", [
this, tx]() {
1230 processTransaction(t,
false,
false, FailHard::no);
1237 auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
1239 if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
1242 JLOG(m_journal.warn()) << transaction->getID() <<
": cached bad!\n";
1243 transaction->setStatus(
INVALID);
1248 auto const view = m_ledgerMaster.getCurrentLedger();
1253 auto const sttx = *transaction->getSTransaction();
1254 if (sttx.isFlag(
tfInnerBatchTxn) && view->rules().enabled(featureBatch))
1256 transaction->setStatus(
INVALID);
1258 app_.getHashRouter().setFlags(
1259 transaction->getID(), HashRouterFlags::BAD);
1266 auto const [validity, reason] =
1267 checkValidity(app_.getHashRouter(), sttx, view->rules(), app_.config());
1269 validity == Validity::Valid,
1270 "xrpl::NetworkOPsImp::processTransaction : valid validity");
1273 if (validity == Validity::SigBad)
1275 JLOG(m_journal.info()) <<
"Transaction has bad signature: " << reason;
1276 transaction->setStatus(
INVALID);
1278 app_.getHashRouter().setFlags(
1279 transaction->getID(), HashRouterFlags::BAD);
1284 app_.getMasterTransaction().canonicalize(&transaction);
1290NetworkOPsImp::processTransaction(
1296 auto ev = m_job_queue.makeLoadEvent(
jtTXN_PROC,
"ProcessTXN");
1299 if (!preProcessTransaction(transaction))
1303 doTransactionSync(transaction, bUnlimited, failType);
1305 doTransactionAsync(transaction, bUnlimited, failType);
1309NetworkOPsImp::doTransactionAsync(
1316 if (transaction->getApplying())
1319 mTransactions.push_back(
1321 transaction->setApplying();
1323 if (mDispatchState == DispatchState::none)
1325 if (m_job_queue.addJob(
1326 jtBATCH,
"transactionBatch", [
this]() { transactionBatch(); }))
1328 mDispatchState = DispatchState::scheduled;
1334NetworkOPsImp::doTransactionSync(
1341 if (!transaction->getApplying())
1343 mTransactions.push_back(
1345 transaction->setApplying();
1348 doTransactionSyncBatch(
1350 return transaction->getApplying();
1355NetworkOPsImp::doTransactionSyncBatch(
1361 if (mDispatchState == DispatchState::running)
1370 if (mTransactions.size())
1373 if (m_job_queue.addJob(
jtBATCH,
"transactionBatch", [
this]() {
1377 mDispatchState = DispatchState::scheduled;
1381 }
while (retryCallback(lock));
1387 auto ev = m_job_queue.makeLoadEvent(
jtTXN_PROC,
"ProcessTXNSet");
1390 for (
auto const& [_, tx] :
set)
1395 if (transaction->getStatus() ==
INVALID)
1397 if (!reason.
empty())
1399 JLOG(m_journal.trace())
1400 <<
"Exception checking transaction: " << reason;
1402 app_.getHashRouter().setFlags(
1403 tx->getTransactionID(), HashRouterFlags::BAD);
1408 if (!preProcessTransaction(transaction))
1419 for (
auto& transaction : candidates)
1421 if (!transaction->getApplying())
1423 transactions.
emplace_back(transaction,
false,
false, FailHard::no);
1424 transaction->setApplying();
1428 if (mTransactions.empty())
1429 mTransactions.swap(transactions);
1432 mTransactions.reserve(mTransactions.size() + transactions.
size());
1433 for (
auto& t : transactions)
1434 mTransactions.push_back(std::move(t));
1436 if (mTransactions.empty())
1438 JLOG(m_journal.debug()) <<
"No transaction to process!";
1445 "xrpl::NetworkOPsImp::processTransactionSet has lock");
1447 mTransactions.begin(), mTransactions.end(), [](
auto const& t) {
1448 return t.transaction->getApplying();
1454NetworkOPsImp::transactionBatch()
1458 if (mDispatchState == DispatchState::running)
1461 while (mTransactions.size())
1472 mTransactions.
swap(transactions);
1474 !transactions.
empty(),
1475 "xrpl::NetworkOPsImp::apply : non-empty transactions");
1477 mDispatchState != DispatchState::running,
1478 "xrpl::NetworkOPsImp::apply : is not running");
1480 mDispatchState = DispatchState::running;
1486 bool changed =
false;
1500 if (e.failType == FailHard::yes)
1503 auto const result = app_.getTxQ().apply(
1504 app_, view, e.transaction->getSTransaction(), flags, j);
1505 e.result = result.ter;
1506 e.applied = result.applied;
1507 changed = changed || result.applied;
1516 if (
auto const l = m_ledgerMaster.getValidatedLedger())
1517 validatedLedgerIndex = l->header().seq;
1519 auto newOL = app_.openLedger().current();
1522 e.transaction->clearSubmitResult();
1526 pubProposedTransaction(
1527 newOL, e.transaction->getSTransaction(), e.result);
1528 e.transaction->setApplied();
1531 e.transaction->setResult(e.result);
1534 app_.getHashRouter().setFlags(
1535 e.transaction->getID(), HashRouterFlags::BAD);
1544 JLOG(m_journal.info())
1545 <<
"TransactionResult: " << token <<
": " << human;
1550 bool addLocal = e.local;
1554 JLOG(m_journal.debug())
1555 <<
"Transaction is now included in open ledger";
1556 e.transaction->setStatus(
INCLUDED);
1561 auto const& txCur = e.transaction->getSTransaction();
1564 for (
auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
1566 txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
1573 if (t->getApplying())
1575 submit_held.
emplace_back(t,
false,
false, FailHard::no);
1584 JLOG(m_journal.info()) <<
"Transaction is obsolete";
1585 e.transaction->setStatus(
OBSOLETE);
1589 JLOG(m_journal.debug())
1590 <<
"Transaction is likely to claim a"
1591 <<
" fee, but is queued until fee drops";
1593 e.transaction->setStatus(
HELD);
1597 m_ledgerMaster.addHeldTransaction(e.transaction);
1598 e.transaction->setQueued();
1599 e.transaction->setKept();
1605 if (e.failType != FailHard::yes)
1607 auto const lastLedgerSeq =
1608 e.transaction->getSTransaction()->at(
1609 ~sfLastLedgerSequence);
1610 auto const ledgersLeft = lastLedgerSeq
1612 m_ledgerMaster.getCurrentLedgerIndex()
1631 (ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
1632 app_.getHashRouter().setFlags(
1633 e.transaction->getID(), HashRouterFlags::HELD))
1636 JLOG(m_journal.debug())
1637 <<
"Transaction should be held: " << e.result;
1638 e.transaction->setStatus(
HELD);
1639 m_ledgerMaster.addHeldTransaction(e.transaction);
1640 e.transaction->setKept();
1643 JLOG(m_journal.debug())
1644 <<
"Not holding transaction "
1645 << e.transaction->getID() <<
": "
1646 << (e.local ?
"local" :
"network") <<
", "
1647 <<
"result: " << e.result <<
" ledgers left: "
1648 << (ledgersLeft ? to_string(*ledgersLeft)
1654 JLOG(m_journal.debug())
1655 <<
"Status other than success " << e.result;
1656 e.transaction->setStatus(
INVALID);
1659 auto const enforceFailHard =
1660 e.failType == FailHard::yes && !
isTesSuccess(e.result);
1662 if (addLocal && !enforceFailHard)
1664 m_localTX->push_back(
1665 m_ledgerMaster.getCurrentLedgerIndex(),
1666 e.transaction->getSTransaction());
1667 e.transaction->setKept();
1671 ((mMode != OperatingMode::FULL) &&
1672 (e.failType != FailHard::yes) && e.local) ||
1677 app_.getHashRouter().shouldRelay(e.transaction->getID());
1678 if (
auto const sttx = *(e.transaction->getSTransaction());
1686 protocol::TMTransaction tx;
1690 tx.set_rawtransaction(s.
data(), s.
size());
1691 tx.set_status(protocol::tsCURRENT);
1692 tx.set_receivetimestamp(
1693 app_.timeKeeper().now().time_since_epoch().count());
1696 app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
1697 e.transaction->setBroadcast();
1701 if (validatedLedgerIndex)
1703 auto [fee, accountSeq, availableSeq] =
1704 app_.getTxQ().getTxRequiredFeeAndSeq(
1705 *newOL, e.transaction->getSTransaction());
1706 e.transaction->setCurrentLedgerState(
1707 *validatedLedgerIndex, fee, accountSeq, availableSeq);
1715 e.transaction->clearApplying();
1717 if (!submit_held.
empty())
1719 if (mTransactions.empty())
1720 mTransactions.swap(submit_held);
1723 mTransactions.reserve(mTransactions.size() + submit_held.
size());
1724 for (
auto& e : submit_held)
1725 mTransactions.push_back(std::move(e));
1731 mDispatchState = DispatchState::none;
1739NetworkOPsImp::getOwnerInfo(
1744 auto root = keylet::ownerDir(account);
1745 auto sleNode = lpLedger->read(keylet::page(
root));
1752 for (
auto const& uDirEntry : sleNode->getFieldV256(sfIndexes))
1754 auto sleCur = lpLedger->read(keylet::child(uDirEntry));
1757 "xrpl::NetworkOPsImp::getOwnerInfo : non-null child SLE");
1759 switch (sleCur->getType())
1762 if (!jvObjects.
isMember(jss::offers))
1763 jvObjects[jss::offers] =
1766 jvObjects[jss::offers].
append(
1767 sleCur->getJson(JsonOptions::none));
1770 case ltRIPPLE_STATE:
1771 if (!jvObjects.
isMember(jss::ripple_lines))
1773 jvObjects[jss::ripple_lines] =
1777 jvObjects[jss::ripple_lines].
append(
1778 sleCur->getJson(JsonOptions::none));
1781 case ltACCOUNT_ROOT:
1786 "xrpl::NetworkOPsImp::getOwnerInfo : invalid "
1793 uNodeDir = sleNode->getFieldU64(sfIndexNext);
1797 sleNode = lpLedger->read(keylet::page(
root, uNodeDir));
1800 "xrpl::NetworkOPsImp::getOwnerInfo : read next page");
1813NetworkOPsImp::isBlocked()
1815 return isAmendmentBlocked() || isUNLBlocked();
1819NetworkOPsImp::isAmendmentBlocked()
1821 return amendmentBlocked_;
1825NetworkOPsImp::setAmendmentBlocked()
1827 amendmentBlocked_ =
true;
1828 setMode(OperatingMode::CONNECTED);
1832NetworkOPsImp::isAmendmentWarned()
1834 return !amendmentBlocked_ && amendmentWarned_;
1838NetworkOPsImp::setAmendmentWarned()
1840 amendmentWarned_ =
true;
1844NetworkOPsImp::clearAmendmentWarned()
1846 amendmentWarned_ =
false;
1850NetworkOPsImp::isUNLBlocked()
1856NetworkOPsImp::setUNLBlocked()
1859 setMode(OperatingMode::CONNECTED);
1863NetworkOPsImp::clearUNLBlocked()
1865 unlBlocked_ =
false;
1869NetworkOPsImp::checkLastClosedLedger(
1878 JLOG(m_journal.trace()) <<
"NetworkOPsImp::checkLastClosedLedger";
1880 auto const ourClosed = m_ledgerMaster.getClosedLedger();
1885 uint256 closedLedger = ourClosed->header().hash;
1886 uint256 prevClosedLedger = ourClosed->header().parentHash;
1887 JLOG(m_journal.trace()) <<
"OurClosed: " << closedLedger;
1888 JLOG(m_journal.trace()) <<
"PrevClosed: " << prevClosedLedger;
1893 auto& validations = app_.getValidations();
1894 JLOG(m_journal.debug())
1895 <<
"ValidationTrie " <<
Json::Compact(validations.getJsonTrie());
1899 peerCounts[closedLedger] = 0;
1900 if (mMode >= OperatingMode::TRACKING)
1901 peerCounts[closedLedger]++;
1903 for (
auto& peer : peerList)
1905 uint256 peerLedger = peer->getClosedLedgerHash();
1908 ++peerCounts[peerLedger];
1911 for (
auto const& it : peerCounts)
1912 JLOG(m_journal.debug()) <<
"L: " << it.first <<
" n=" << it.second;
1914 uint256 preferredLCL = validations.getPreferredLCL(
1916 m_ledgerMaster.getValidLedgerIndex(),
1919 bool switchLedgers = preferredLCL != closedLedger;
1921 closedLedger = preferredLCL;
1923 if (switchLedgers && (closedLedger == prevClosedLedger))
1926 JLOG(m_journal.info()) <<
"We won't switch to our own previous ledger";
1927 networkClosed = ourClosed->header().hash;
1928 switchLedgers =
false;
1931 networkClosed = closedLedger;
1936 auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
1939 consensus = app_.getInboundLedgers().acquire(
1940 closedLedger, 0, InboundLedger::Reason::CONSENSUS);
1943 (!m_ledgerMaster.canBeCurrent(consensus) ||
1944 !m_ledgerMaster.isCompatible(
1945 *consensus, m_journal.debug(),
"Not switching")))
1949 networkClosed = ourClosed->header().hash;
1953 JLOG(m_journal.warn()) <<
"We are not running on the consensus ledger";
1954 JLOG(m_journal.info()) <<
"Our LCL: " << ourClosed->header().hash
1956 JLOG(m_journal.info()) <<
"Net LCL " << closedLedger;
1958 if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
1960 setMode(OperatingMode::CONNECTED);
1968 switchLastClosedLedger(consensus);
1975NetworkOPsImp::switchLastClosedLedger(
1979 JLOG(m_journal.error())
1980 <<
"JUMP last closed ledger to " << newLCL->header().hash;
1982 clearNeedNetworkLedger();
1985 app_.getTxQ().processClosedLedger(app_, *newLCL,
true);
1992 auto retries = m_localTX->getTxSet();
1993 auto const lastVal = app_.getLedgerMaster().getValidatedLedger();
1998 rules.
emplace(app_.config().features);
1999 app_.openLedger().accept(
2010 return app_.getTxQ().accept(app_, view);
2014 m_ledgerMaster.switchLCL(newLCL);
2016 protocol::TMStatusChange s;
2017 s.set_newevent(protocol::neSWITCHED_LEDGER);
2018 s.set_ledgerseq(newLCL->header().seq);
2019 s.set_networktime(app_.timeKeeper().now().time_since_epoch().count());
2020 s.set_ledgerhashprevious(
2021 newLCL->header().parentHash.begin(),
2022 newLCL->header().parentHash.size());
2024 newLCL->header().hash.begin(), newLCL->header().hash.size());
2026 app_.overlay().foreach(
2031NetworkOPsImp::beginConsensus(
2037 "xrpl::NetworkOPsImp::beginConsensus : nonzero input");
2039 auto closingInfo = m_ledgerMaster.getCurrentLedger()->header();
2041 JLOG(m_journal.info()) <<
"Consensus time for #" << closingInfo.seq
2042 <<
" with LCL " << closingInfo.parentHash;
2044 auto prevLedger = m_ledgerMaster.getLedgerByHash(closingInfo.parentHash);
2049 if (mMode == OperatingMode::FULL)
2051 JLOG(m_journal.warn()) <<
"Don't have LCL, going to tracking";
2052 setMode(OperatingMode::TRACKING);
2053 CLOG(clog) <<
"beginConsensus Don't have LCL, going to tracking. ";
2056 CLOG(clog) <<
"beginConsensus no previous ledger. ";
2061 prevLedger->header().hash == closingInfo.parentHash,
2062 "xrpl::NetworkOPsImp::beginConsensus : prevLedger hash matches "
2065 closingInfo.parentHash ==
2066 m_ledgerMaster.getClosedLedger()->header().hash,
2067 "xrpl::NetworkOPsImp::beginConsensus : closedLedger parent matches "
2070 app_.validators().setNegativeUNL(prevLedger->negativeUNL());
2071 TrustChanges const changes = app_.validators().updateTrusted(
2072 app_.getValidations().getCurrentNodeIDs(),
2073 closingInfo.parentCloseTime,
2076 app_.getHashRouter());
2078 if (!changes.
added.empty() || !changes.
removed.empty())
2080 app_.getValidations().trustChanged(changes.
added, changes.
removed);
2082 app_.getAmendmentTable().trustChanged(
2083 app_.validators().getQuorumKeys().second);
2086 mConsensus.startRound(
2087 app_.timeKeeper().closeTime(),
2095 if (mLastConsensusPhase != currPhase)
2097 reportConsensusStateChange(currPhase);
2098 mLastConsensusPhase = currPhase;
2101 JLOG(m_journal.debug()) <<
"Initiating consensus engine";
2108 auto const& peerKey = peerPos.
publicKey();
2109 if (validatorPK_ == peerKey || validatorMasterPK_ == peerKey)
2120 JLOG(m_journal.error())
2121 <<
"Received a proposal signed by MY KEY from a peer. This may "
2122 "indicate a misconfiguration where another node has the same "
2123 "validator key, or may be caused by unusual message routing and "
2128 return mConsensus.peerProposal(app_.timeKeeper().closeTime(), peerPos);
2139 protocol::TMHaveTransactionSet msg;
2140 msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8);
2141 msg.set_status(protocol::tsHAVE);
2142 app_.overlay().foreach(
2147 mConsensus.gotTxSet(app_.timeKeeper().closeTime(),
RCLTxSet{map});
2153 uint256 deadLedger = m_ledgerMaster.getClosedLedger()->header().parentHash;
2155 for (
auto const& it : app_.overlay().getActivePeers())
2157 if (it && (it->getClosedLedgerHash() == deadLedger))
2159 JLOG(m_journal.trace()) <<
"Killing obsolete peer status";
2166 checkLastClosedLedger(app_.overlay().getActivePeers(), networkClosed);
2168 if (networkClosed.
isZero())
2170 CLOG(clog) <<
"endConsensus last closed ledger is zero. ";
2180 if (((mMode == OperatingMode::CONNECTED) ||
2181 (mMode == OperatingMode::SYNCING)) &&
2187 if (!needNetworkLedger_)
2188 setMode(OperatingMode::TRACKING);
2191 if (((mMode == OperatingMode::CONNECTED) ||
2192 (mMode == OperatingMode::TRACKING)) &&
2198 auto current = m_ledgerMaster.getCurrentLedger();
2199 if (app_.timeKeeper().now() <
2200 (
current->header().parentCloseTime +
2201 2 *
current->header().closeTimeResolution))
2203 setMode(OperatingMode::FULL);
2207 beginConsensus(networkClosed, clog);
2211NetworkOPsImp::consensusViewChange()
2213 if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
2215 setMode(OperatingMode::CONNECTED);
2225 if (!mStreamMaps[sManifests].empty())
2229 jvObj[jss::type] =
"manifestReceived";
2232 jvObj[jss::signing_key] =
2236 jvObj[jss::signature] =
strHex(*sig);
2239 jvObj[jss::domain] = mo.
domain;
2242 for (
auto i = mStreamMaps[sManifests].begin();
2243 i != mStreamMaps[sManifests].end();)
2245 if (
auto p = i->second.lock())
2247 p->send(jvObj,
true);
2252 i = mStreamMaps[sManifests].erase(i);
2258NetworkOPsImp::ServerFeeSummary::ServerFeeSummary(
2262 : loadFactorServer{loadFeeTrack.getLoadFactor()}
2263 , loadBaseServer{loadFeeTrack.getLoadBase()}
2265 , em{
std::move(escalationMetrics)}
2275 em.has_value() != b.
em.has_value())
2281 em->minProcessingFeeLevel != b.
em->minProcessingFeeLevel ||
2282 em->openLedgerFeeLevel != b.
em->openLedgerFeeLevel ||
2283 em->referenceFeeLevel != b.
em->referenceFeeLevel);
2316 jvObj[jss::type] =
"serverStatus";
2318 jvObj[jss::load_base] = f.loadBaseServer;
2319 jvObj[jss::load_factor_server] = f.loadFactorServer;
2320 jvObj[jss::base_fee] = f.baseFee.jsonClipped();
2325 safe_cast<std::uint64_t>(f.loadFactorServer),
2327 f.em->openLedgerFeeLevel,
2329 f.em->referenceFeeLevel)
2332 jvObj[jss::load_factor] =
trunc32(loadFactor);
2333 jvObj[jss::load_factor_fee_escalation] =
2334 f.em->openLedgerFeeLevel.jsonClipped();
2335 jvObj[jss::load_factor_fee_queue] =
2336 f.em->minProcessingFeeLevel.jsonClipped();
2337 jvObj[jss::load_factor_fee_reference] =
2338 f.em->referenceFeeLevel.jsonClipped();
2341 jvObj[jss::load_factor] = f.loadFactorServer;
2355 p->send(jvObj,
true);
2372 if (!streamMap.empty())
2375 jvObj[jss::type] =
"consensusPhase";
2376 jvObj[jss::consensus] =
to_string(phase);
2378 for (
auto i = streamMap.begin(); i != streamMap.end();)
2380 if (
auto p = i->second.lock())
2382 p->send(jvObj,
true);
2387 i = streamMap.erase(i);
2403 auto const signerPublic = val->getSignerPublic();
2405 jvObj[jss::type] =
"validationReceived";
2406 jvObj[jss::validation_public_key] =
2408 jvObj[jss::ledger_hash] =
to_string(val->getLedgerHash());
2409 jvObj[jss::signature] =
strHex(val->getSignature());
2410 jvObj[jss::full] = val->isFull();
2411 jvObj[jss::flags] = val->getFlags();
2412 jvObj[jss::signing_time] = *(*val)[~sfSigningTime];
2413 jvObj[jss::data] =
strHex(val->getSerializer().slice());
2416 if (
auto version = (*val)[~sfServerVersion])
2419 if (
auto cookie = (*val)[~sfCookie])
2422 if (
auto hash = (*val)[~sfValidatedHash])
2423 jvObj[jss::validated_hash] =
strHex(*hash);
2425 auto const masterKey =
2428 if (masterKey != signerPublic)
2433 if (
auto const seq = (*val)[~sfLedgerSequence])
2434 jvObj[jss::ledger_index] = *seq;
2436 if (val->isFieldPresent(sfAmendments))
2439 for (
auto const& amendment : val->getFieldV256(sfAmendments))
2440 jvObj[jss::amendments].append(
to_string(amendment));
2443 if (
auto const closeTime = (*val)[~sfCloseTime])
2444 jvObj[jss::close_time] = *closeTime;
2446 if (
auto const loadFee = (*val)[~sfLoadFee])
2447 jvObj[jss::load_fee] = *loadFee;
2449 if (
auto const baseFee = val->at(~sfBaseFee))
2450 jvObj[jss::base_fee] =
static_cast<double>(*baseFee);
2452 if (
auto const reserveBase = val->at(~sfReserveBase))
2453 jvObj[jss::reserve_base] = *reserveBase;
2455 if (
auto const reserveInc = val->at(~sfReserveIncrement))
2456 jvObj[jss::reserve_inc] = *reserveInc;
2460 if (
auto const baseFeeXRP = ~val->at(~sfBaseFeeDrops);
2461 baseFeeXRP && baseFeeXRP->native())
2462 jvObj[jss::base_fee] = baseFeeXRP->xrp().jsonClipped();
2464 if (
auto const reserveBaseXRP = ~val->at(~sfReserveBaseDrops);
2465 reserveBaseXRP && reserveBaseXRP->native())
2466 jvObj[jss::reserve_base] = reserveBaseXRP->xrp().jsonClipped();
2468 if (
auto const reserveIncXRP = ~val->at(~sfReserveIncrementDrops);
2469 reserveIncXRP && reserveIncXRP->native())
2470 jvObj[jss::reserve_inc] = reserveIncXRP->xrp().jsonClipped();
2479 if (jvTx.
isMember(jss::ledger_index))
2481 jvTx[jss::ledger_index] =
2489 if (
auto p = i->second.lock())
2493 [&](
Json::Value const& jv) { p->send(jv,
true); });
2513 jvObj[jss::type] =
"peerStatusChange";
2522 p->send(jvObj,
true);
2536 using namespace std::chrono_literals;
2568 <<
"recvValidation " << val->getLedgerHash() <<
" from " << source;
2584 <<
"Exception thrown for handling new validation "
2585 << val->getLedgerHash() <<
": " << e.
what();
2590 <<
"Unknown exception thrown for handling new validation "
2591 << val->getLedgerHash();
2603 ss <<
"VALIDATION: " << val->render() <<
" master_key: ";
2640 "This server is amendment blocked, and must be updated to be "
2641 "able to stay in sync with the network.";
2648 "This server has an expired validator list. validators.txt "
2649 "may be incorrectly configured or some [validator_list_sites] "
2650 "may be unreachable.";
2657 "One or more unsupported amendments have reached majority. "
2658 "Upgrade to the latest version before they are activated "
2659 "to avoid being amendment blocked.";
2660 if (
auto const expected =
2664 d[jss::expected_date] = expected->time_since_epoch().count();
2665 d[jss::expected_date_UTC] =
to_string(*expected);
2669 if (warnings.size())
2670 info[jss::warnings] = std::move(warnings);
2685 info[jss::time] =
to_string(std::chrono::floor<std::chrono::microseconds>(
2689 info[jss::network_ledger] =
"waiting";
2691 info[jss::validation_quorum] =
2699 info[jss::node_size] =
"tiny";
2702 info[jss::node_size] =
"small";
2705 info[jss::node_size] =
"medium";
2708 info[jss::node_size] =
"large";
2711 info[jss::node_size] =
"huge";
2720 info[jss::validator_list_expires] =
2721 safe_cast<Json::UInt>(when->time_since_epoch().count());
2723 info[jss::validator_list_expires] = 0;
2733 if (*when == TimeKeeper::time_point::max())
2735 x[jss::expiration] =
"never";
2736 x[jss::status] =
"active";
2743 x[jss::status] =
"active";
2745 x[jss::status] =
"expired";
2750 x[jss::status] =
"unknown";
2751 x[jss::expiration] =
"unknown";
2755#if defined(GIT_COMMIT_HASH) || defined(GIT_BRANCH)
2758#ifdef GIT_COMMIT_HASH
2759 x[jss::hash] = GIT_COMMIT_HASH;
2762 x[jss::branch] = GIT_BRANCH;
2767 info[jss::io_latency_ms] =
2775 info[jss::pubkey_validator] =
2780 info[jss::pubkey_validator] =
"none";
2790 info[jss::counters][jss::nodestore] = nodestore;
2794 info[jss::pubkey_node] =
2800 info[jss::amendment_blocked] =
true;
2814 lastClose[jss::converge_time_s] =
2819 lastClose[jss::converge_time] =
2823 info[jss::last_close] = lastClose;
2831 info[jss::network_id] =
static_cast<Json::UInt>(*netid);
2833 auto const escalationMetrics =
2841 auto const loadFactorFeeEscalation =
2843 escalationMetrics.openLedgerFeeLevel,
2845 escalationMetrics.referenceFeeLevel)
2849 safe_cast<std::uint64_t>(loadFactorServer), loadFactorFeeEscalation);
2853 info[jss::load_base] = loadBaseServer;
2854 info[jss::load_factor] =
trunc32(loadFactor);
2855 info[jss::load_factor_server] = loadFactorServer;
2862 info[jss::load_factor_fee_escalation] =
2863 escalationMetrics.openLedgerFeeLevel.jsonClipped();
2864 info[jss::load_factor_fee_queue] =
2865 escalationMetrics.minProcessingFeeLevel.jsonClipped();
2866 info[jss::load_factor_fee_reference] =
2867 escalationMetrics.referenceFeeLevel.jsonClipped();
2871 info[jss::load_factor] =
2872 static_cast<double>(loadFactor) / loadBaseServer;
2874 if (loadFactorServer != loadFactor)
2875 info[jss::load_factor_server] =
2876 static_cast<double>(loadFactorServer) / loadBaseServer;
2881 if (fee != loadBaseServer)
2882 info[jss::load_factor_local] =
2883 static_cast<double>(fee) / loadBaseServer;
2885 if (fee != loadBaseServer)
2886 info[jss::load_factor_net] =
2887 static_cast<double>(fee) / loadBaseServer;
2889 if (fee != loadBaseServer)
2890 info[jss::load_factor_cluster] =
2891 static_cast<double>(fee) / loadBaseServer;
2893 if (escalationMetrics.openLedgerFeeLevel !=
2894 escalationMetrics.referenceFeeLevel &&
2895 (admin || loadFactorFeeEscalation != loadFactor))
2896 info[jss::load_factor_fee_escalation] =
2897 escalationMetrics.openLedgerFeeLevel.decimalFromReference(
2898 escalationMetrics.referenceFeeLevel);
2899 if (escalationMetrics.minProcessingFeeLevel !=
2900 escalationMetrics.referenceFeeLevel)
2901 info[jss::load_factor_fee_queue] =
2902 escalationMetrics.minProcessingFeeLevel.decimalFromReference(
2903 escalationMetrics.referenceFeeLevel);
2916 XRPAmount const baseFee = lpClosed->fees().base;
2918 l[jss::seq] =
Json::UInt(lpClosed->header().seq);
2919 l[jss::hash] =
to_string(lpClosed->header().hash);
2924 l[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
2925 l[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
2927 lpClosed->header().closeTime.time_since_epoch().count());
2932 l[jss::reserve_base_xrp] = lpClosed->fees().reserve.decimalXRP();
2933 l[jss::reserve_inc_xrp] = lpClosed->fees().increment.decimalXRP();
2936 std::abs(closeOffset.count()) >= 60)
2937 l[jss::close_time_offset] =
2945 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2949 auto lCloseTime = lpClosed->header().closeTime;
2951 if (lCloseTime <= closeTime)
2953 using namespace std::chrono_literals;
2954 auto age = closeTime - lCloseTime;
2956 Json::UInt(age < highAgeThreshold ? age.count() : 0);
2962 info[jss::validated_ledger] = l;
2964 info[jss::closed_ledger] = l;
2968 info[jss::published_ledger] =
"none";
2969 else if (lpPublished->header().seq != lpClosed->header().seq)
2970 info[jss::published_ledger] = lpPublished->header().seq;
2975 info[jss::jq_trans_overflow] =
2977 info[jss::peer_disconnects] =
2979 info[jss::peer_disconnects_resources] =
2984 "http",
"https",
"peer",
"ws",
"ws2",
"wss",
"wss2"};
2992 !(port.admin_nets_v4.empty() && port.admin_nets_v6.empty() &&
2993 port.admin_user.empty() && port.admin_password.empty()))
3007 for (
auto const& p : proto)
3008 jv[jss::protocol].append(p);
3015 auto const optPort = grpcSection.
get(
"port");
3016 if (optPort && grpcSection.get(
"ip"))
3019 jv[jss::port] = *optPort;
3021 jv[jss::protocol].
append(
"grpc");
3024 info[jss::ports] = std::move(ports);
3070 [&](
Json::Value const& jv) { p->send(jv, true); });
3095 lpAccepted->header().hash, alpAccepted);
3099 alpAccepted->getLedger().
get() == lpAccepted.
get(),
3100 "xrpl::NetworkOPsImp::pubLedger : accepted input");
3104 <<
"Publishing ledger " << lpAccepted->header().seq <<
" "
3105 << lpAccepted->header().hash;
3113 jvObj[jss::type] =
"ledgerClosed";
3114 jvObj[jss::ledger_index] = lpAccepted->header().seq;
3115 jvObj[jss::ledger_hash] =
to_string(lpAccepted->header().hash);
3117 lpAccepted->header().closeTime.time_since_epoch().count());
3121 if (!lpAccepted->rules().enabled(featureXRPFees))
3123 jvObj[jss::fee_base] = lpAccepted->fees().base.jsonClipped();
3124 jvObj[jss::reserve_base] = lpAccepted->fees().reserve.jsonClipped();
3125 jvObj[jss::reserve_inc] =
3126 lpAccepted->fees().increment.jsonClipped();
3128 jvObj[jss::txn_count] =
Json::UInt(alpAccepted->size());
3132 jvObj[jss::validated_ledgers] =
3142 p->send(jvObj,
true);
3160 p->send(jvObj,
true);
3169 static bool firstTime =
true;
3176 for (
auto& inner : outer.second)
3178 auto& subInfo = inner.second;
3179 if (subInfo.index_->separationLedgerSeq_ == 0)
3182 alpAccepted->getLedger(), subInfo);
3191 for (
auto const& accTx : *alpAccepted)
3195 lpAccepted, *accTx, accTx == *(--alpAccepted->end()));
3222 "reportConsensusStateChange->pubConsensus",
3253 jvObj[jss::type] =
"transaction";
3257 jvObj[jss::transaction] =
3264 jvObj[jss::meta], *ledger, transaction, meta->
get());
3267 jvObj[jss::meta], transaction, meta->
get());
3271 if (
auto const& lookup = ledger->txRead(transaction->getTransactionID());
3272 lookup.second && lookup.second->isFieldPresent(sfTransactionIndex))
3274 uint32_t
const txnSeq = lookup.second->getFieldU32(sfTransactionIndex);
3276 if (transaction->isFieldPresent(sfNetworkID))
3277 netID = transaction->getFieldU32(sfNetworkID);
3282 jvObj[jss::ctid] = *ctid;
3284 if (!ledger->open())
3285 jvObj[jss::ledger_hash] =
to_string(ledger->header().hash);
3289 jvObj[jss::ledger_index] = ledger->header().seq;
3290 jvObj[jss::transaction][jss::date] =
3291 ledger->header().closeTime.time_since_epoch().count();
3292 jvObj[jss::validated] =
true;
3293 jvObj[jss::close_time_iso] =
to_string_iso(ledger->header().closeTime);
3299 jvObj[jss::validated] =
false;
3300 jvObj[jss::ledger_current_index] = ledger->header().seq;
3303 jvObj[jss::status] = validated ?
"closed" :
"proposed";
3304 jvObj[jss::engine_result] = sToken;
3305 jvObj[jss::engine_result_code] = result;
3306 jvObj[jss::engine_result_message] = sHuman;
3308 if (transaction->getTxnType() == ttOFFER_CREATE)
3310 auto const account = transaction->getAccountID(sfAccount);
3311 auto const amount = transaction->getFieldAmount(sfTakerGets);
3314 if (account != amount.issue().account)
3322 jvObj[jss::transaction][jss::owner_funds] = ownerFunds.getText();
3330 [&]<
unsigned Version>(
3332 RPC::insertDeliverMax(
3333 jvTx[jss::transaction], transaction->getTxnType(), Version);
3335 if constexpr (Version > 1)
3337 jvTx[jss::tx_json] = jvTx.removeMember(jss::transaction);
3338 jvTx[jss::hash] = hash;
3342 jvTx[jss::transaction][jss::hash] = hash;
3355 auto const& stTxn = transaction.
getTxn();
3359 auto const trResult = transaction.
getResult();
3374 [&](
Json::Value const& jv) { p->send(jv, true); });
3391 [&](
Json::Value const& jv) { p->send(jv, true); });
3416 auto const currLedgerSeq = ledger->seq();
3423 for (
auto const& affectedAccount : transaction.
getAffected())
3428 auto it = simiIt->second.begin();
3430 while (it != simiIt->second.end())
3441 it = simiIt->second.erase(it);
3448 auto it = simiIt->second.begin();
3449 while (it != simiIt->second.end())
3460 it = simiIt->second.erase(it);
3467 auto& subs = historyIt->second;
3468 auto it = subs.begin();
3469 while (it != subs.end())
3472 if (currLedgerSeq <= info.index_->separationLedgerSeq_)
3486 it = subs.erase(it);
3497 <<
"pubAccountTransaction: "
3498 <<
"proposed=" << iProposed <<
", accepted=" << iAccepted;
3500 if (!notify.
empty() || !accountHistoryNotify.
empty())
3502 auto const& stTxn = transaction.
getTxn();
3506 auto const trResult = transaction.
getResult();
3512 isrListener->getApiVersion(),
3513 [&](
Json::Value const& jv) { isrListener->send(jv,
true); });
3517 jvObj.
set(jss::account_history_boundary,
true);
3520 jvObj.
isMember(jss::account_history_tx_stream) ==
3522 "xrpl::NetworkOPsImp::pubAccountTransaction : "
3523 "account_history_tx_stream not set");
3524 for (
auto& info : accountHistoryNotify)
3526 auto& index = info.index_;
3527 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3528 jvObj.
set(jss::account_history_tx_first,
true);
3530 jvObj.
set(jss::account_history_tx_index, index->forwardTxIndex_++);
3533 info.sink_->getApiVersion(),
3534 [&](
Json::Value const& jv) { info.sink_->send(jv,
true); });
3559 for (
auto const& affectedAccount : tx->getMentionedAccounts())
3564 auto it = simiIt->second.begin();
3566 while (it != simiIt->second.end())
3577 it = simiIt->second.erase(it);
3584 JLOG(
m_journal.
trace()) <<
"pubProposedAccountTransaction: " << iProposed;
3586 if (!notify.
empty() || !accountHistoryNotify.
empty())
3593 isrListener->getApiVersion(),
3594 [&](
Json::Value const& jv) { isrListener->send(jv,
true); });
3597 jvObj.
isMember(jss::account_history_tx_stream) ==
3599 "xrpl::NetworkOPs::pubProposedAccountTransaction : "
3600 "account_history_tx_stream not set");
3601 for (
auto& info : accountHistoryNotify)
3603 auto& index = info.index_;
3604 if (index->forwardTxIndex_ == 0 && !index->haveHistorical_)
3605 jvObj.
set(jss::account_history_tx_first,
true);
3606 jvObj.
set(jss::account_history_tx_index, index->forwardTxIndex_++);
3608 info.sink_->getApiVersion(),
3609 [&](
Json::Value const& jv) { info.sink_->send(jv,
true); });
3626 for (
auto const& naAccountID : vnaAccountIDs)
3629 <<
"subAccount: account: " <<
toBase58(naAccountID);
3631 isrListener->insertSubAccountInfo(naAccountID, rt);
3636 for (
auto const& naAccountID : vnaAccountIDs)
3638 auto simIterator = subMap.
find(naAccountID);
3639 if (simIterator == subMap.
end())
3643 usisElement[isrListener->getSeq()] = isrListener;
3645 subMap.
insert(simIterator, make_pair(naAccountID, usisElement));
3650 simIterator->second[isrListener->getSeq()] = isrListener;
3661 for (
auto const& naAccountID : vnaAccountIDs)
3664 isrListener->deleteSubAccountInfo(naAccountID, rt);
3681 for (
auto const& naAccountID : vnaAccountIDs)
3683 auto simIterator = subMap.
find(naAccountID);
3685 if (simIterator != subMap.
end())
3688 simIterator->second.erase(uSeq);
3690 if (simIterator->second.empty())
3693 subMap.
erase(simIterator);
3702 enum DatabaseType { Sqlite,
None };
3703 static auto const databaseType = [&]() -> DatabaseType {
3708 return DatabaseType::Sqlite;
3710 return DatabaseType::None;
3713 if (databaseType == DatabaseType::None)
3716 UNREACHABLE(
"xrpl::NetworkOPsImp::addAccountHistoryJob : no database");
3718 <<
"AccountHistory job for account "
3731 "AccountHistoryTxStream",
3732 [
this, dbType = databaseType, subInfo]() {
3733 auto const& accountId = subInfo.
index_->accountId_;
3734 auto& lastLedgerSeq = subInfo.
index_->historyLastLedgerSeq_;
3735 auto& txHistoryIndex = subInfo.
index_->historyTxIndex_;
3738 <<
"AccountHistory job for account " <<
toBase58(accountId)
3739 <<
" started. lastLedgerSeq=" << lastLedgerSeq;
3749 auto stx = tx->getSTransaction();
3750 if (stx->getAccountID(sfAccount) == accountId &&
3751 stx->getSeqValue() == 1)
3755 for (
auto& node : meta->getNodes())
3757 if (node.getFieldU16(sfLedgerEntryType) != ltACCOUNT_ROOT)
3760 if (node.isFieldPresent(sfNewFields))
3762 if (
auto inner =
dynamic_cast<STObject const*
>(
3763 node.peekAtPField(sfNewFields));
3766 if (inner->isFieldPresent(sfAccount) &&
3767 inner->getAccountID(sfAccount) == accountId)
3779 bool unsubscribe) ->
bool {
3782 sptr->send(jvObj,
true);
3792 bool unsubscribe) ->
bool {
3796 sptr->getApiVersion(),
3797 [&](
Json::Value const& jv) { sptr->send(jv,
true); });
3820 accountId, minLedger, maxLedger, marker, 0,
true};
3821 return db->newestAccountTxPage(options);
3826 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3827 "getMoreTxns : invalid database type");
3837 while (lastLedgerSeq >= 2 && !subInfo.
index_->stopHistorical_)
3839 int feeChargeCount = 0;
3848 <<
"AccountHistory job for account "
3849 <<
toBase58(accountId) <<
" no InfoSub. Fee charged "
3850 << feeChargeCount <<
" times.";
3855 auto startLedgerSeq =
3856 (lastLedgerSeq > 1024 + 2 ? lastLedgerSeq - 1024 : 2);
3858 <<
"AccountHistory job for account " <<
toBase58(accountId)
3859 <<
", working on ledger range [" << startLedgerSeq <<
","
3860 << lastLedgerSeq <<
"]";
3862 auto haveRange = [&]() ->
bool {
3865 auto haveSomeValidatedLedgers =
3867 validatedMin, validatedMax);
3869 return haveSomeValidatedLedgers &&
3870 validatedMin <= startLedgerSeq &&
3871 lastLedgerSeq <= validatedMax;
3877 <<
"AccountHistory reschedule job for account "
3878 <<
toBase58(accountId) <<
", incomplete ledger range ["
3879 << startLedgerSeq <<
"," << lastLedgerSeq <<
"]";
3885 while (!subInfo.
index_->stopHistorical_)
3888 getMoreTxns(startLedgerSeq, lastLedgerSeq, marker);
3893 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3894 "getMoreTxns failed");
3896 <<
"AccountHistory job for account "
3897 <<
toBase58(accountId) <<
" getMoreTxns failed.";
3903 auto const& txns = dbResult->first;
3904 marker = dbResult->second;
3905 size_t num_txns = txns.size();
3906 for (
size_t i = 0; i < num_txns; ++i)
3908 auto const& [tx, meta] = txns[i];
3913 <<
"AccountHistory job for account "
3914 <<
toBase58(accountId) <<
" empty tx or meta.";
3925 "xrpl::NetworkOPsImp::addAccountHistoryJob : "
3926 "getLedgerBySeq failed");
3928 <<
"AccountHistory job for account "
3929 <<
toBase58(accountId) <<
" no ledger.";
3935 tx->getSTransaction();
3940 "NetworkOPsImp::addAccountHistoryJob : "
3941 "getSTransaction failed");
3943 <<
"AccountHistory job for account "
3945 <<
" getSTransaction failed.";
3952 auto const trR = meta->getResultTER();
3954 transJson(stTxn, trR,
true, curTxLedger, mRef);
3957 jss::account_history_tx_index, txHistoryIndex--);
3958 if (i + 1 == num_txns ||
3959 txns[i + 1].first->getLedger() != tx->getLedger())
3960 jvTx.
set(jss::account_history_boundary,
true);
3962 if (isFirstTx(tx, meta))
3964 jvTx.
set(jss::account_history_tx_first,
true);
3965 sendMultiApiJson(jvTx,
false);
3968 <<
"AccountHistory job for account "
3970 <<
" done, found last tx.";
3975 sendMultiApiJson(jvTx,
false);
3982 <<
"AccountHistory job for account "
3984 <<
" paging, marker=" << marker->ledgerSeq <<
":"
3993 if (!subInfo.
index_->stopHistorical_)
3995 lastLedgerSeq = startLedgerSeq - 1;
3996 if (lastLedgerSeq <= 1)
3999 <<
"AccountHistory job for account "
4001 <<
" done, reached genesis ledger.";
4014 subInfo.
index_->separationLedgerSeq_ = ledger->seq();
4015 auto const& accountId = subInfo.
index_->accountId_;
4017 if (!ledger->exists(accountKeylet))
4020 <<
"subAccountHistoryStart, no account " <<
toBase58(accountId)
4021 <<
", no need to add AccountHistory job.";
4026 if (
auto const sleAcct = ledger->read(accountKeylet); sleAcct)
4028 if (sleAcct->getFieldU32(sfSequence) == 1)
4031 <<
"subAccountHistoryStart, genesis account "
4033 <<
" does not have tx, no need to add AccountHistory job.";
4041 "xrpl::NetworkOPsImp::subAccountHistoryStart : failed to "
4042 "access genesis account");
4047 subInfo.
index_->historyLastLedgerSeq_ = ledger->seq();
4048 subInfo.
index_->haveHistorical_ =
true;
4051 <<
"subAccountHistoryStart, add AccountHistory job: accountId="
4052 <<
toBase58(accountId) <<
", currentLedgerSeq=" << ledger->seq();
4062 if (!isrListener->insertSubAccountHistory(accountId))
4065 <<
"subAccountHistory, already subscribed to account "
4077 inner.
emplace(isrListener->getSeq(), ahi);
4083 simIterator->second.emplace(isrListener->getSeq(), ahi);
4097 <<
"subAccountHistory, no validated ledger yet, delay start";
4110 isrListener->deleteSubAccountHistory(account);
4124 auto& subInfoMap = simIterator->second;
4125 auto subInfoIter = subInfoMap.find(seq);
4126 if (subInfoIter != subInfoMap.end())
4128 subInfoIter->second.index_->stopHistorical_ =
true;
4133 simIterator->second.erase(seq);
4134 if (simIterator->second.empty())
4140 <<
"unsubAccountHistory, account " <<
toBase58(account)
4141 <<
", historyOnly = " << (historyOnly ?
"true" :
"false");
4149 listeners->addSubscriber(isrListener);
4153 UNREACHABLE(
"xrpl::NetworkOPsImp::subBook : null book listeners");
4163 listeners->removeSubscriber(uSeq);
4175 m_standalone,
"xrpl::NetworkOPsImp::acceptLedger : is standalone");
4178 Throw<std::runtime_error>(
4179 "Operation only possible in STANDALONE mode.");
4194 jvResult[jss::ledger_index] = lpClosed->header().seq;
4195 jvResult[jss::ledger_hash] =
to_string(lpClosed->header().hash);
4197 lpClosed->header().closeTime.time_since_epoch().count());
4198 if (!lpClosed->rules().enabled(featureXRPFees))
4200 jvResult[jss::fee_base] = lpClosed->fees().base.jsonClipped();
4201 jvResult[jss::reserve_base] = lpClosed->fees().reserve.jsonClipped();
4202 jvResult[jss::reserve_inc] = lpClosed->fees().increment.jsonClipped();
4208 jvResult[jss::validated_ledgers] =
4214 .emplace(isrListener->getSeq(), isrListener)
4224 .emplace(isrListener->getSeq(), isrListener)
4250 .emplace(isrListener->getSeq(), isrListener)
4278 jvResult[jss::random] =
to_string(uRandom);
4280 jvResult[jss::load_base] = feeTrack.getLoadBase();
4281 jvResult[jss::load_factor] = feeTrack.getLoadFactor();
4282 jvResult[jss::hostid] =
getHostId(admin);
4283 jvResult[jss::pubkey_node] =
4288 .emplace(isrListener->getSeq(), isrListener)
4306 .emplace(isrListener->getSeq(), isrListener)
4324 .emplace(isrListener->getSeq(), isrListener)
4342 .emplace(isrListener->getSeq(), isrListener)
4366 .emplace(isrListener->getSeq(), isrListener)
4384 .emplace(isrListener->getSeq(), isrListener)
4432 if (map.find(pInfo->getSeq()) != map.end())
4439#ifndef USE_NEW_BOOK_PAGE
4450 unsigned int iLimit,
4460 uint256 uTipIndex = uBookBase;
4464 stream <<
"getBookPage:" << book;
4465 stream <<
"getBookPage: uBookBase=" << uBookBase;
4466 stream <<
"getBookPage: uBookEnd=" << uBookEnd;
4467 stream <<
"getBookPage: uTipIndex=" << uTipIndex;
4476 bool bDirectAdvance =
true;
4480 unsigned int uBookEntry;
4486 while (!bDone && iLimit-- > 0)
4490 bDirectAdvance =
false;
4494 auto const ledgerIndex = view.
succ(uTipIndex, uBookEnd);
4498 sleOfferDir.
reset();
4507 uTipIndex = sleOfferDir->key();
4510 cdirFirst(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex);
4513 <<
"getBookPage: uTipIndex=" << uTipIndex;
4515 <<
"getBookPage: offerIndex=" << offerIndex;
4525 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4526 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4527 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4529 bool firstOwnerOffer(
true);
4535 saOwnerFunds = saTakerGets;
4537 else if (bGlobalFreeze)
4545 auto umBalanceEntry = umBalance.
find(uOfferOwnerID);
4546 if (umBalanceEntry != umBalance.
end())
4550 saOwnerFunds = umBalanceEntry->second;
4551 firstOwnerOffer =
false;
4565 if (saOwnerFunds < beast::zero)
4569 saOwnerFunds.
clear();
4577 STAmount saOwnerFundsLimit = saOwnerFunds;
4589 saOwnerFundsLimit =
divide(saOwnerFunds, offerRate);
4592 if (saOwnerFundsLimit >= saTakerGets)
4595 saTakerGetsFunded = saTakerGets;
4601 saTakerGetsFunded = saOwnerFundsLimit;
4603 saTakerGetsFunded.
setJson(jvOffer[jss::taker_gets_funded]);
4607 saTakerGetsFunded, saDirRate, saTakerPays.
issue()))
4608 .setJson(jvOffer[jss::taker_pays_funded]);
4614 saOwnerFunds,
multiply(saTakerGetsFunded, offerRate));
4616 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4620 jvOf[jss::quality] = saDirRate.
getText();
4622 if (firstOwnerOffer)
4623 jvOf[jss::owner_funds] = saOwnerFunds.
getText();
4630 if (!
cdirNext(view, uTipIndex, sleOfferDir, uBookEntry, offerIndex))
4632 bDirectAdvance =
true;
4637 <<
"getBookPage: offerIndex=" << offerIndex;
4657 unsigned int iLimit,
4665 MetaView lesActive(lpLedger,
tapNONE,
true);
4666 OrderBookIterator obIterator(lesActive, book);
4670 bool const bGlobalFreeze = lesActive.isGlobalFrozen(book.
out.
account) ||
4671 lesActive.isGlobalFrozen(book.
in.
account);
4673 while (iLimit-- > 0 && obIterator.nextOffer())
4678 auto const uOfferOwnerID = sleOffer->getAccountID(sfAccount);
4679 auto const& saTakerGets = sleOffer->getFieldAmount(sfTakerGets);
4680 auto const& saTakerPays = sleOffer->getFieldAmount(sfTakerPays);
4681 STAmount saDirRate = obIterator.getCurrentRate();
4687 saOwnerFunds = saTakerGets;
4689 else if (bGlobalFreeze)
4697 auto umBalanceEntry = umBalance.
find(uOfferOwnerID);
4699 if (umBalanceEntry != umBalance.
end())
4703 saOwnerFunds = umBalanceEntry->second;
4709 saOwnerFunds = lesActive.accountHolds(
4715 if (saOwnerFunds.isNegative())
4719 saOwnerFunds.zero();
4726 STAmount saTakerGetsFunded;
4727 STAmount saOwnerFundsLimit = saOwnerFunds;
4739 saOwnerFundsLimit =
divide(saOwnerFunds, offerRate);
4742 if (saOwnerFundsLimit >= saTakerGets)
4745 saTakerGetsFunded = saTakerGets;
4750 saTakerGetsFunded = saOwnerFundsLimit;
4752 saTakerGetsFunded.setJson(jvOffer[jss::taker_gets_funded]);
4758 multiply(saTakerGetsFunded, saDirRate, saTakerPays.issue()))
4759 .setJson(jvOffer[jss::taker_pays_funded]);
4762 STAmount saOwnerPays = (
parityRate == offerRate)
4765 saOwnerFunds,
multiply(saTakerGetsFunded, offerRate));
4767 umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
4769 if (!saOwnerFunds.isZero() || uOfferOwnerID == uTakerID)
4773 jvOf[jss::quality] = saDirRate.
getText();
4788 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4828 ++counters_[
static_cast<std::size_t>(om)].transitions;
4830 counters_[
static_cast<std::size_t>(om)].transitions == 1)
4832 initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
4833 now - processStart_)
4837 std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
4846 auto [counters, mode, start, initialSync] = getCounterData();
4847 auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
4857 auto& state = obj[jss::state_accounting][
states_[i]];
4858 state[jss::transitions] =
std::to_string(counters[i].transitions);
4859 state[jss::duration_us] =
std::to_string(counters[i].dur.count());
4863 obj[jss::initial_sync_duration_us] =
std::to_string(initialSync);
4878 boost::asio::io_context& io_svc,
T back_inserter(T... args)
Decorator for streaming out compact json.
Lightweight wrapper to tag static string.
Value & append(Value const &value)
Append value to array at the end.
bool isMember(char const *key) const
Return true if the object has a member named key.
Value get(UInt index, Value const &defaultValue) const
If the array contains at least index+1 elements, returns the element value, otherwise returns default...
A generic endpoint for log messages.
Stream trace() const
Severity stream access functions.
A metric for measuring an integral value.
void set(value_type value) const
Set the value on the gauge.
A reference to a handler for performing polled collection.
A transaction that is in a closed ledger.
TxMeta const & getMeta() const
boost::container::flat_set< AccountID > const & getAffected() const
std::shared_ptr< STTx const > const & getTxn() const
virtual std::optional< NetClock::time_point > firstUnsupportedExpected() const =0
virtual perf::PerfLog & getPerfLog()=0
virtual TaggedCache< uint256, AcceptedLedger > & getAcceptedLedgerCache()=0
virtual std::chrono::milliseconds getIOLatency()=0
virtual Cluster & cluster()=0
virtual Config & config()=0
virtual InboundLedgers & getInboundLedgers()=0
virtual LoadFeeTrack & getFeeTrack()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NodeStore::Database & getNodeStore()=0
virtual ServerHandler & getServerHandler()=0
virtual TimeKeeper & timeKeeper()=0
virtual std::optional< PublicKey const > getValidationPublicKey() const =0
virtual OpenLedger & openLedger()=0
virtual AmendmentTable & getAmendmentTable()=0
virtual OrderBookDB & getOrderBookDB()=0
virtual Overlay & overlay()=0
virtual JobQueue & getJobQueue()=0
virtual ManifestCache & validatorManifests()=0
virtual RelationalDatabase & getRelationalDatabase()=0
virtual std::pair< PublicKey, SecretKey > const & nodeIdentity()=0
virtual ValidatorList & validators()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Holds transactions which were deferred to the next pass of consensus.
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
std::uint32_t getLoadFee() const
NetClock::time_point getReportTime() const
PublicKey const & identity() const
std::string const & name() const
std::size_t size() const
The number of nodes in the cluster list.
std::string SERVER_DOMAIN
int RELAY_UNTRUSTED_VALIDATIONS
static constexpr std::uint32_t FEE_UNITS_DEPRECATED
virtual Json::Value getInfo()=0
virtual void clearFailures()=0
std::shared_ptr< InfoSub > pointer
A pool of threads to perform work.
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Json::Value getJson(int c=0)
std::chrono::seconds getValidatedLedgerAge()
bool getValidatedRange(std::uint32_t &minVal, std::uint32_t &maxVal)
std::shared_ptr< Ledger const > getLedgerBySeq(std::uint32_t index)
bool haveValidated()
Whether we have ever fully validated a ledger.
std::size_t getFetchPackCacheSize() const
std::shared_ptr< Ledger const > getClosedLedger()
std::string getCompleteLedgers()
std::shared_ptr< Ledger const > getValidatedLedger()
std::shared_ptr< ReadView const > getPublishedLedger()
std::shared_ptr< ReadView const > getCurrentLedger()
Manages the current fee schedule.
std::uint32_t getClusterFee() const
std::uint32_t getLocalFee() const
std::uint32_t getRemoteFee() const
std::uint32_t getLoadFactor() const
std::uint32_t getLoadBase() const
void heartbeat()
Reset the stall detection timer.
PublicKey getMasterKey(PublicKey const &pk) const
Returns ephemeral signing key's master public key.
State accounting records two attributes for each possible server state: 1) Amount of time spent in ea...
void json(Json::Value &obj) const
Output state counters in JSON format.
std::chrono::steady_clock::time_point const processStart_
static std::array< Json::StaticString const, 5 > const states_
CounterData getCounterData() const
std::uint64_t initialSyncUs_
std::array< Counters, 5 > counters_
void mode(OperatingMode om)
Record state transition.
std::chrono::steady_clock::time_point start_
Transaction with input flags and results to be applied in batches.
std::shared_ptr< Transaction > const transaction
TransactionStatus(std::shared_ptr< Transaction > t, bool a, bool l, FailHard f)
std::string getHostId(bool forAdmin)
void reportConsensusStateChange(ConsensusPhase phase)
void addAccountHistoryJob(SubAccountHistoryInfoWeak subInfo)
void clearNeedNetworkLedger() override
ServerFeeSummary mLastFeeSummary
Json::Value getOwnerInfo(std::shared_ptr< ReadView const > lpLedger, AccountID const &account) override
DispatchState mDispatchState
std::size_t const minPeerCount_
static std::array< char const *, 5 > const states_
std::set< uint256 > pendingValidations_
void pubAccountTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
ClosureCounter< void, boost::system::error_code const & > waitHandlerCounter_
MultiApiJson transJson(std::shared_ptr< STTx const > const &transaction, TER result, bool validated, std::shared_ptr< ReadView const > const &ledger, std::optional< std::reference_wrapper< TxMeta const > > meta)
bool unsubManifests(std::uint64_t uListener) override
void pubPeerStatus(std::function< Json::Value(void)> const &) override
void unsubAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subManifests(InfoSub::ref ispListener) override
void stateAccounting(Json::Value &obj) override
void pubLedger(std::shared_ptr< ReadView const > const &lpAccepted) override
SubInfoMapType mSubRTAccount
void subAccount(InfoSub::ref ispListener, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
void transactionBatch()
Apply transactions in batches.
bool unsubRTTransactions(std::uint64_t uListener) override
void getBookPage(std::shared_ptr< ReadView const > &lpLedger, Book const &, AccountID const &uTakerID, bool const bProof, unsigned int iLimit, Json::Value const &jvMarker, Json::Value &jvResult) override
bool processTrustedProposal(RCLCxPeerPos proposal) override
error_code_i subAccountHistory(InfoSub::ref ispListener, AccountID const &account) override
subscribe an account's new transactions and retrieve the account's historical transactions
void subAccountHistoryStart(std::shared_ptr< ReadView const > const &ledger, SubAccountHistoryInfoWeak &subInfo)
void pubValidation(std::shared_ptr< STValidation > const &val) override
bool subBook(InfoSub::ref ispListener, Book const &) override
InfoSub::pointer addRpcSub(std::string const &strUrl, InfoSub::ref) override
void endConsensus(std::unique_ptr< std::stringstream > const &clog) override
std::atomic< OperatingMode > mMode
void setMode(OperatingMode om) override
void setAmendmentBlocked() override
void pubConsensus(ConsensusPhase phase)
std::recursive_mutex mSubLock
bool isNeedNetworkLedger() override
DispatchState
Synchronization states for transaction batches.
std::atomic< bool > needNetworkLedger_
boost::asio::steady_timer heartbeatTimer_
bool subConsensus(InfoSub::ref ispListener) override
bool unsubBook(std::uint64_t uListener, Book const &) override
bool unsubLedger(std::uint64_t uListener) override
bool checkLastClosedLedger(Overlay::PeerSequence const &, uint256 &networkClosed)
void pubProposedAccountTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result)
void unsubAccountHistoryInternal(std::uint64_t seq, AccountID const &account, bool historyOnly) override
void pubValidatedTransaction(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &transaction, bool last)
void switchLastClosedLedger(std::shared_ptr< Ledger const > const &newLCL)
std::optional< PublicKey > const validatorPK_
std::atomic< bool > amendmentBlocked_
void clearAmendmentWarned() override
void updateLocalTx(ReadView const &view) override
void clearLedgerFetch() override
void apply(std::unique_lock< std::mutex > &batchLock)
Attempt to apply transactions and post-process based on the results.
InfoSub::pointer findRpcSub(std::string const &strUrl) override
bool isAmendmentBlocked() override
std::string strOperatingMode(OperatingMode const mode, bool const admin) const override
std::unique_ptr< LocalTxs > m_localTX
void setStandAlone() override
void setNeedNetworkLedger() override
bool subServer(InfoSub::ref ispListener, Json::Value &jvResult, bool admin) override
void setTimer(boost::asio::steady_timer &timer, std::chrono::milliseconds const &expiry_time, std::function< void()> onExpire, std::function< void()> onError)
bool unsubServer(std::uint64_t uListener) override
SubAccountHistoryMapType mSubAccountHistory
void processClusterTimer()
bool unsubConsensus(std::uint64_t uListener) override
std::condition_variable mCond
void pubManifest(Manifest const &) override
void consensusViewChange() override
boost::asio::steady_timer accountHistoryTxTimer_
Json::Value getConsensusInfo() override
bool recvValidation(std::shared_ptr< STValidation > const &val, std::string const &source) override
void setUNLBlocked() override
bool unsubValidations(std::uint64_t uListener) override
bool subPeerStatus(InfoSub::ref ispListener) override
void doTransactionAsync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failtype)
For transactions not submitted by a locally connected client, fire and forget.
ConsensusPhase mLastConsensusPhase
OperatingMode getOperatingMode() const override
std::optional< PublicKey > const validatorMasterPK_
void doTransactionSyncBatch(std::unique_lock< std::mutex > &lock, std::function< bool(std::unique_lock< std::mutex > const &)> retryCallback)
NetworkOPsImp(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool start_valid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
std::array< SubMapType, SubTypes::sLastEntry > mStreamMaps
std::vector< TransactionStatus > mTransactions
bool tryRemoveRpcSub(std::string const &strUrl) override
bool beginConsensus(uint256 const &networkClosed, std::unique_ptr< std::stringstream > const &clog) override
void processHeartbeatTimer()
void doTransactionSync(std::shared_ptr< Transaction > transaction, bool bUnlimited, FailHard failType)
For transactions submitted directly by a client, apply batch of transactions and wait for this transa...
void submitTransaction(std::shared_ptr< STTx const > const &) override
void setAmendmentWarned() override
LedgerMaster & m_ledgerMaster
Json::Value getServerInfo(bool human, bool admin, bool counters) override
StateAccounting accounting_
bool subValidations(InfoSub::ref ispListener) override
void setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
bool subRTTransactions(InfoSub::ref ispListener) override
std::atomic< bool > unlBlocked_
bool unsubBookChanges(std::uint64_t uListener) override
void unsubAccountHistory(InfoSub::ref ispListener, AccountID const &account, bool historyOnly) override
unsubscribe an account's transactions
void setStateTimer() override
Called to initially start our timers.
std::size_t getLocalTxCount() override
bool preProcessTransaction(std::shared_ptr< Transaction > &transaction)
void processTransaction(std::shared_ptr< Transaction > &transaction, bool bUnlimited, bool bLocal, FailHard failType) override
Process transactions as they arrive from the network or which are submitted by clients.
bool unsubTransactions(std::uint64_t uListener) override
bool isAmendmentWarned() override
bool subTransactions(InfoSub::ref ispListener) override
std::mutex validationsMutex_
std::uint32_t acceptLedger(std::optional< std::chrono::milliseconds > consensusDelay) override
Accepts the current transaction tree, return the new ledger's sequence.
SubInfoMapType mSubAccount
void clearUNLBlocked() override
bool isUNLBlocked() override
void pubProposedTransaction(std::shared_ptr< ReadView const > const &ledger, std::shared_ptr< STTx const > const &transaction, TER result) override
std::atomic< bool > amendmentWarned_
boost::asio::steady_timer clusterTimer_
bool unsubPeerStatus(std::uint64_t uListener) override
void reportFeeChange() override
void processTransactionSet(CanonicalTXSet const &set) override
Process a set of transactions synchronously, and ensuring that they are processed in one batch.
void mapComplete(std::shared_ptr< SHAMap > const &map, bool fromAcquire) override
bool subLedger(InfoSub::ref ispListener, Json::Value &jvResult) override
bool isBlocked() override
~NetworkOPsImp() override
Json::Value getLedgerFetchInfo() override
void unsubAccountInternal(std::uint64_t seq, hash_set< AccountID > const &vnaAccountIDs, bool rt) override
bool subBookChanges(InfoSub::ref ispListener) override
Provides server functionality for clients.
void getCountsJson(Json::Value &obj)
std::shared_ptr< OpenView const > current() const
Returns a view to the current open ledger.
Writable ledger view that accumulates state and tx changes.
BookListeners::pointer getBookListeners(Book const &)
void processTxn(std::shared_ptr< ReadView const > const &ledger, AcceptedLedgerTx const &alTx, MultiApiJson const &jvObj)
BookListeners::pointer makeBookListeners(Book const &)
virtual std::uint64_t getPeerDisconnect() const =0
virtual std::optional< std::uint32_t > networkID() const =0
Returns the ID of the network this server is configured for, if any.
virtual std::uint64_t getPeerDisconnectCharges() const =0
virtual std::uint64_t getJqTransOverflow() const =0
virtual std::size_t size() const =0
Returns the number of active peers.
Manages the generic consensus algorithm for use by the RCL.
std::size_t prevProposers() const
Get the number of proposing peers that participated in the previous round.
void simulate(NetClock::time_point const &now, std::optional< std::chrono::milliseconds > consensusDelay)
Json::Value getJson(bool full) const
std::chrono::milliseconds prevRoundTime() const
Get duration of the previous round.
A peer's signed, proposed position for use in RCLConsensus.
PublicKey const & publicKey() const
Public key of peer that sent the proposal.
Represents a set of transactions in RCLConsensus.
Wraps a ledger instance for use in generic Validations LedgerTrie.
static std::string getWordFromBlob(void const *blob, size_t bytes)
Chooses a single dictionary word from the data.
Collects logging information.
std::unique_ptr< std::stringstream > const & ss()
virtual std::optional< key_type > succ(key_type const &key, std::optional< key_type > const &last=std::nullopt) const =0
Return the key of the next state item.
virtual std::shared_ptr< SLE const > read(Keylet const &k) const =0
Return the state item associated with a key.
Issue const & issue() const
std::string getText() const override
void setJson(Json::Value &) const
std::optional< T > get(std::string const &name) const
std::size_t size() const noexcept
void const * data() const noexcept
void setup(Setup const &setup, beast::Journal journal)
time_point now() const override
Returns the current time, using the server's clock.
std::chrono::seconds closeOffset() const
time_point closeTime() const
Returns the predicted close time, in network time.
Metrics getMetrics(OpenView const &view) const
Returns fee metrics in reference fee level units.
Validator keys and manifest as set in configuration file.
std::optional< PublicKey > localPublicKey() const
This function returns the local validator public key or a std::nullopt.
std::size_t quorum() const
Get quorum value for current trusted key set.
std::optional< TimeKeeper::time_point > expires() const
Return the time when the validator list will expire.
std::size_t count() const
Return the number of configured validator list sites.
std::optional< PublicKey > getTrustedKey(PublicKey const &identity) const
Returns master public key if public key is trusted.
Json::Value jsonClipped() const
constexpr double decimalXRP() const
static constexpr std::size_t size()
virtual Json::Value currentJson() const =0
Render currently executing jobs and RPC calls and durations in Json.
virtual Json::Value countersJson() const =0
Render performance counters in Json.
Automatically unlocks and re-locks a unique_lock object.
T emplace_back(T... args)
@ arrayValue
array value (ordered list)
@ objectValue
object value (collection of name/value pairs).
void rngfill(void *const buffer, std::size_t const bytes, Generator &g)
std::string const & getVersionString()
Server version.
std::optional< std::string > encodeCTID(uint32_t ledgerSeq, uint32_t txnIndex, uint32_t networkID) noexcept
Encodes ledger sequence, transaction index, and network ID into a CTID string.
Json::Value computeBookChanges(std::shared_ptr< L const > const &lpAccepted)
void insertMPTokenIssuanceID(Json::Value &response, std::shared_ptr< STTx const > const &transaction, TxMeta const &transactionMeta)
void insertDeliveredAmount(Json::Value &meta, ReadView const &, std::shared_ptr< STTx const > const &serializedTx, TxMeta const &)
Add a delivered_amount field to the meta input/output parameter.
void insertNFTSyntheticInJson(Json::Value &, std::shared_ptr< STTx const > const &, TxMeta const &)
Adds common synthetic fields to transaction-related JSON responses.
Charge const feeMediumBurdenRPC
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
Keylet offer(AccountID const &id, std::uint32_t seq) noexcept
An offer from an account.
Keylet account(AccountID const &id) noexcept
AccountID root.
Keylet page(uint256 const &root, std::uint64_t index=0) noexcept
A page in a directory.
Rate rate(Env &env, Account const &account, std::uint32_t const &seq)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::unique_ptr< FeeVote > make_FeeVote(FeeSetup const &setup, beast::Journal journal)
Create an instance of the FeeVote logic.
STAmount divide(STAmount const &amount, Rate const &rate)
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
bool isTerRetry(TER x) noexcept
csprng_engine & crypto_prng()
The default cryptographically secure PRNG.
std::optional< std::uint64_t > mulDiv(std::uint64_t value, std::uint64_t mul, std::uint64_t div)
Return value*mul/div accurately.
Json::Value getJson(LedgerFill const &fill)
Return a new Json::Value representing the ledger with given options.
constexpr std::uint32_t tfInnerBatchTxn
std::string to_string(base_uint< Bits, Tag > const &a)
std::string strHex(FwdIt begin, FwdIt end)
Rules makeRulesGivenLedger(DigestAwareReadView const &ledger, Rules const ¤t)
std::uint64_t getQuality(uint256 const &uBase)
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
FeeSetup setup_FeeVote(Section const §ion)
Number root(Number f, unsigned d)
std::unique_ptr< NetworkOPs > make_NetworkOPs(Application &app, NetworkOPs::clock_type &clock, bool standalone, std::size_t minPeerCount, bool startvalid, JobQueue &job_queue, LedgerMaster &ledgerMaster, ValidatorKeys const &validatorKeys, boost::asio::io_context &io_svc, beast::Journal journal, beast::insight::Collector::ptr const &collector)
bool transResultInfo(TER code, std::string &token, std::string &text)
bool cdirNext(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the next entry in the directory, advancing the index.
STAmount multiply(STAmount const &amount, Rate const &rate)
static auto const genesisAccountId
Seed generateSeed(std::string const &passPhrase)
Generate a seed deterministically.
bool cdirFirst(ReadView const &view, uint256 const &root, std::shared_ptr< SLE const > &page, unsigned int &index, uint256 &entry)
Returns the first entry in the directory, advancing the index.
std::pair< PublicKey, SecretKey > generateKeyPair(KeyType type, Seed const &seed)
Generate a key pair deterministically.
STAmount accountHolds(ReadView const &view, AccountID const &account, Currency const ¤cy, AccountID const &issuer, FreezeHandling zeroIfFrozen, beast::Journal j)
@ current
This was a new validation and was added.
constexpr std::size_t maxPoppedTransactions
STAmount accountFunds(ReadView const &view, AccountID const &id, STAmount const &saDefault, FreezeHandling freezeHandling, beast::Journal j)
bool isGlobalFrozen(ReadView const &view, AccountID const &issuer)
STAmount amountFromQuality(std::uint64_t rate)
bool isTefFailure(TER x) noexcept
Rate transferRate(ReadView const &view, AccountID const &issuer)
Returns IOU issuer transfer fee as Rate.
auto constexpr muldiv_max
uint256 getQualityNext(uint256 const &uBase)
ConsensusPhase
Phases of consensus for a single ledger round.
send_if_pred< Predicate > send_if(std::shared_ptr< Message > const &m, Predicate const &f)
Helper function to aid in type deduction.
AccountID calcAccountID(PublicKey const &pk)
uint256 getBookBase(Book const &book)
Json::Value rpcError(error_code_i iError)
std::string to_string_iso(date::sys_time< Duration > tp)
std::unique_ptr< LocalTxs > make_LocalTxs()
std::pair< Validity, std::string > checkValidity(HashRouter &router, STTx const &tx, Rules const &rules, Config const &config)
Checks transaction signature and local checks.
bool isTelLocal(TER x) noexcept
@ ledgerMaster
ledger master data for signing
@ proposal
proposal for signing
bool isTesSuccess(TER x) noexcept
static std::uint32_t trunc32(std::uint64_t v)
void forAllApiVersions(Fn const &fn, Args &&... args)
static std::array< char const *, 5 > const stateNames
void handleNewValidation(Application &app, std::shared_ptr< STValidation > const &val, std::string const &source, BypassAccept const bypassAccept, std::optional< beast::Journal > j)
Handle a new validation.
OperatingMode
Specifies the mode under which the server believes it's operating.
@ TRACKING
convinced we agree with the network
@ DISCONNECTED
not ready to process requests
@ CONNECTED
convinced we are talking to the network
@ FULL
we have the ledger and can even validate
@ SYNCING
fallen slightly behind
std::shared_ptr< STTx const > sterilize(STTx const &stx)
Sterilize a transaction.
bool isTemMalformed(TER x) noexcept
Rate const parityRate
A transfer rate signifying a 1:1 exchange.
@ warnRPC_AMENDMENT_BLOCKED
@ warnRPC_UNSUPPORTED_MAJORITY
@ warnRPC_EXPIRED_VALIDATOR_LIST
T set_intersection(T... args)
PublicKey masterKey
The master key associated with this manifest.
std::string serialized
The manifest in serialized form.
Blob getMasterSignature() const
Returns manifest master key signature.
std::string domain
The domain, if one was specified in the manifest; empty otherwise.
std::optional< Blob > getSignature() const
Returns manifest signature.
std::optional< PublicKey > signingKey
The ephemeral key associated with this manifest.
std::uint32_t sequence
The sequence number of this manifest.
Server fees published on server subscription.
std::optional< TxQ::Metrics > em
bool operator!=(ServerFeeSummary const &b) const
bool operator==(ServerFeeSummary const &b) const
std::uint32_t loadBaseServer
ServerFeeSummary()=default
std::uint32_t loadFactorServer
decltype(initialSyncUs_) initialSyncUs
decltype(counters_) counters
std::chrono::microseconds dur
std::uint64_t transitions
beast::insight::Gauge syncing_duration
beast::insight::Gauge tracking_duration
beast::insight::Gauge connected_duration
beast::insight::Gauge tracking_transitions
Stats(Handler const &handler, beast::insight::Collector::ptr const &collector)
beast::insight::Gauge connected_transitions
beast::insight::Gauge full_transitions
beast::insight::Gauge disconnected_duration
beast::insight::Gauge syncing_transitions
beast::insight::Gauge disconnected_transitions
beast::insight::Gauge full_duration
beast::insight::Hook hook
std::int32_t historyTxIndex_
AccountID const accountId_
std::uint32_t forwardTxIndex_
std::uint32_t separationLedgerSeq_
std::uint32_t historyLastLedgerSeq_
SubAccountHistoryIndex(AccountID const &accountId)
std::atomic< bool > stopHistorical_
std::shared_ptr< SubAccountHistoryIndex > index_
std::shared_ptr< SubAccountHistoryIndex > index_
Represents a transfer rate.
Data format for exchanging consumption information across peers.
std::vector< Item > items
Changes in trusted nodes after updating validator list.
hash_set< NodeID > removed
Structure returned by TxQ::getMetrics, expressed in reference fee level units.
void set(char const *key, auto const &v)
IsMemberResult isMember(char const *key) const
Select all peers (except optional excluded) that are in our cluster.
Sends a message to all peers.
T time_since_epoch(T... args)