3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/UnorderedContainers.h>
5#include <xrpl/basics/chrono.h>
6#include <xrpl/beast/clock/abstract_clock.h>
7#include <xrpl/beast/insight/Insight.h>
8#include <xrpl/beast/utility/PropertyStream.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/json/json_value.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/resource/Fees.h>
13#include <xrpl/resource/Gossip.h>
14#include <xrpl/resource/detail/Import.h>
32 warn = collector->makeMeter(
"warn");
33 drop = collector->makeMeter(
"drop");
92 Entry* entry(
nullptr);
96 auto [resultIt, resultInserted] =
table_.emplace(
101 entry = &resultIt->second;
102 entry->key = &resultIt->first;
104 if (entry->refcount == 1)
114 JLOG(
journal_.debug()) <<
"New inbound endpoint " << *entry;
122 Entry* entry(
nullptr);
126 auto [resultIt, resultInserted] =
table_.emplace(
131 entry = &resultIt->second;
132 entry->key = &resultIt->first;
134 if (entry->refcount == 1)
142 JLOG(
journal_.debug()) <<
"New outbound endpoint " << *entry;
155 Entry* entry(
nullptr);
159 auto [resultIt, resultInserted] =
table_.emplace(
164 entry = &resultIt->second;
165 entry->key = &resultIt->first;
167 if (entry->refcount == 1)
175 JLOG(
journal_.debug()) <<
"New unlimited endpoint " << *entry;
197 int const localBalance = inboundEntry.localBalance.value(now);
198 if ((localBalance + inboundEntry.remoteBalance) >= threshold)
201 entry[jss::local] = localBalance;
202 entry[jss::remote] = inboundEntry.remoteBalance;
203 entry[jss::type] =
"inbound";
208 int const localBalance = outboundEntry.localBalance.value(now);
209 if ((localBalance + outboundEntry.remoteBalance) >= threshold)
212 entry[jss::local] = localBalance;
213 entry[jss::remote] = outboundEntry.remoteBalance;
214 entry[jss::type] =
"outbound";
217 for (
auto& adminEntry :
admin_)
219 int const localBalance = adminEntry.localBalance.value(now);
220 if ((localBalance + adminEntry.remoteBalance) >= threshold)
223 entry[jss::local] = localBalance;
224 entry[jss::remote] = adminEntry.remoteBalance;
225 entry[jss::type] =
"admin";
245 item.
balance = inboundEntry.localBalance.value(now);
249 gossip.
items.push_back(item);
261 auto const elapsed =
clock_.now();
272 Import& next(resultIt->second);
274 next.items.reserve(gossip.
items.size());
276 for (
auto const& gossipItem : gossip.
items)
279 item.
balance = gossipItem.balance;
282 next.items.push_back(item);
292 next.items.reserve(gossip.
items.size());
293 for (
auto const& gossipItem : gossip.
items)
296 item.
balance = gossipItem.balance;
299 next.items.push_back(item);
302 Import& prev(resultIt->second);
303 for (
auto& item : prev.items)
305 item.consumer.entry().remoteBalance -= item.balance;
322 auto const elapsed =
clock_.now();
326 if (iter->whenExpires <= elapsed)
328 JLOG(
journal_.debug()) <<
"Expired " << *iter;
329 auto tableIter =
table_.find(*iter->key);
342 Import&
import(iter->second);
343 if (iter->second.whenExpires <= elapsed)
345 for (
auto itemIter(
import.items.begin()); itemIter !=
import.items.end();
348 itemIter->consumer.entry().remoteBalance -= itemIter->balance;
379 Entry& entry(iter->second);
380 XRPL_ASSERT(entry.refcount == 0,
"xrpl::Resource::Logic::erase : entry not used");
396 if (--entry.refcount == 0)
398 JLOG(
journal_.debug()) <<
"Inactive " << entry;
400 switch (entry.key->kind)
414 "xrpl::Resource::Logic::release : invalid entry "
431 kFeeLogAsWarn > kFeeLogAsInfo && kFeeLogAsInfo > kFeeLogAsDebug && kFeeLogAsDebug > 10);
434 if (cost >= kFeeLogAsWarn)
435 return journal.warn();
436 if (cost >= kFeeLogAsInfo)
437 return journal.info();
438 if (cost >= kFeeLogAsDebug)
439 return journal.debug();
440 return journal.trace();
443 if (!context.empty())
444 context =
" (" + context +
")";
449 JLOG(kGetStream(fee.
cost(),
journal_)) <<
"Charging " << entry <<
" for " << fee << context;
456 if (entry.isUnlimited())
461 auto const elapsed =
clock_.now();
466 entry.lastWarningTime = elapsed;
470 JLOG(
journal_.info()) <<
"Load warning: " << entry;
479 if (entry.isUnlimited())
485 int const balance(entry.balance(now));
488 JLOG(
journal_.warn()) <<
"Consumer entry " << entry <<
" dropped with balance "
505 return entry.balance(
clock_.now());
516 for (
auto& entry : list)
519 if (entry.refcount != 0)
520 item[
"count"] = entry.refcount;
521 item[
"name"] = entry.toString();
522 item[
"balance"] = entry.balance(now);
523 if (entry.remoteBalance != 0)
524 item[
"remote_balance"] = entry.remoteBalance;
std::chrono::steady_clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
A version-independent IP address and port combination.
Address const & address() const
Returns the address portion of this endpoint.
Endpoint atPort(Port port) const
Returns a new Endpoint with a different port.
A generic endpoint for log messages.
Intrusive doubly linked list.
std::shared_ptr< Collector > ptr
A metric for measuring an integral value.
value_type cost() const
Return the cost of the charge in Resource::Manager units.
int value_type
The type used to hold a consumption charge.
An endpoint that consumes resources.
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
void acquire(Entry &entry)
std::recursive_mutex lock_
int balance(Entry &entry)
EntryIntrusiveList inbound_
void release(Entry &entry)
void erase(Table::iterator iter)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
EntryIntrusiveList admin_
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
static void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
void importConsumers(std::string const &origin, Gossip const &gossip)
static Disposition disposition(int balance)
EntryIntrusiveList outbound_
void onWrite(beast::PropertyStream::Map &map)
hash_map< std::string, Import > Imports
EntryIntrusiveList inactive_
json::Value getJson(int threshold)
Returns a json::ValueType::Object.
bool disconnect(Entry &entry)
hash_map< Key, Entry, Key::Hasher, Key::KeyEqual > Table
beast::List< Entry > EntryIntrusiveList
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
@ Object
object value (collection of name/value pairs).
Disposition
The disposition of a consumer after applying a load charge.
@ Warn
Consumer should be disconnected for excess consumption.
static constexpr std::chrono::seconds kSecondsUntilExpiration
static constexpr auto kMinimumGossipBalance
static constexpr auto kDropThreshold
static constexpr auto kWarningThreshold
Tunable constants.
static constexpr std::chrono::seconds kGossipExpirationSeconds
beast::AbstractClock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
Describes a single consumer.
beast::IP::Endpoint address
Data format for exchanging consumption information across peers.
std::vector< Item > items
A set of imported consumer data from a gossip origin.
Stats(beast::insight::Collector::ptr const &collector)
beast::insight::Meter warn
beast::insight::Meter drop