xrpld
Loading...
Searching...
No Matches
include/xrpl/resource/detail/Logic.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/UnorderedContainers.h>
5#include <xrpl/basics/chrono.h>
6#include <xrpl/beast/clock/abstract_clock.h>
7#include <xrpl/beast/insight/Insight.h>
8#include <xrpl/beast/utility/PropertyStream.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/json/json_value.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/resource/Fees.h>
13#include <xrpl/resource/Gossip.h>
14#include <xrpl/resource/detail/Import.h>
15
16#include <mutex>
17
18namespace xrpl::Resource {
19
20class Logic
21{
22private:
27
28 struct Stats
29 {
31 {
32 warn = collector->makeMeter("warn");
33 drop = collector->makeMeter("drop");
34 }
35
38 };
39
43
45
46 // Table of all entries
48
49 // Because the following are intrusive lists, a given Entry may be in
50 // at most list at a given instant. The Entry must be removed from
51 // one list before placing it in another.
52
53 // List of all active inbound entries
55
56 // List of all active outbound entries
58
59 // List of all active admin entries
61
62 // List of all inactive entries
64
65 // All imported gossip data
67
68 //--------------------------------------------------------------------------
69public:
71 beast::insight::Collector::ptr const& collector,
72 clock_type& clock,
73 beast::Journal journal)
74 : stats_(collector), clock_(clock), journal_(journal)
75 {
76 }
77
79 {
80 // These have to be cleared before the Logic is destroyed
81 // since their destructors call back into the class.
82 // Order matters here as well, the import table has to be
83 // destroyed before the consumer table.
84 //
85 importTable_.clear();
86 table_.clear();
87 }
88
91 {
92 Entry* entry(nullptr);
93
94 {
95 std::scoped_lock const _(lock_);
96 auto [resultIt, resultInserted] = table_.emplace(
98 std::make_tuple(Kind::Inbound, address.atPort(0)), // Key
99 std::make_tuple(clock_.now())); // Entry
100
101 entry = &resultIt->second;
102 entry->key = &resultIt->first;
103 ++entry->refcount;
104 if (entry->refcount == 1)
105 {
106 if (!resultInserted)
107 {
108 inactive_.erase(inactive_.iteratorTo(*entry));
109 }
110 inbound_.pushBack(*entry);
111 }
112 }
113
114 JLOG(journal_.debug()) << "New inbound endpoint " << *entry;
115
116 return Consumer(*this, *entry);
117 }
118
121 {
122 Entry* entry(nullptr);
123
124 {
125 std::scoped_lock const _(lock_);
126 auto [resultIt, resultInserted] = table_.emplace(
128 std::make_tuple(Kind::Outbound, address), // Key
129 std::make_tuple(clock_.now())); // Entry
130
131 entry = &resultIt->second;
132 entry->key = &resultIt->first;
133 ++entry->refcount;
134 if (entry->refcount == 1)
135 {
136 if (!resultInserted)
137 inactive_.erase(inactive_.iteratorTo(*entry));
138 outbound_.pushBack(*entry);
139 }
140 }
141
142 JLOG(journal_.debug()) << "New outbound endpoint " << *entry;
143
144 return Consumer(*this, *entry);
145 }
146
154 {
155 Entry* entry(nullptr);
156
157 {
158 std::scoped_lock const _(lock_);
159 auto [resultIt, resultInserted] = table_.emplace(
161 std::make_tuple(Kind::Unlimited, address.atPort(1)), // Key
162 std::make_tuple(clock_.now())); // Entry
163
164 entry = &resultIt->second;
165 entry->key = &resultIt->first;
166 ++entry->refcount;
167 if (entry->refcount == 1)
168 {
169 if (!resultInserted)
170 inactive_.erase(inactive_.iteratorTo(*entry));
171 admin_.pushBack(*entry);
172 }
173 }
174
175 JLOG(journal_.debug()) << "New unlimited endpoint " << *entry;
176
177 return Consumer(*this, *entry);
178 }
179
182 {
184 }
185
188 getJson(int threshold)
189 {
190 clock_type::time_point const now(clock_.now());
191
193 std::scoped_lock const _(lock_);
194
195 for (auto& inboundEntry : inbound_)
196 {
197 int const localBalance = inboundEntry.localBalance.value(now);
198 if ((localBalance + inboundEntry.remoteBalance) >= threshold)
199 {
200 json::Value& entry = (ret[inboundEntry.toString()] = json::ValueType::Object);
201 entry[jss::local] = localBalance;
202 entry[jss::remote] = inboundEntry.remoteBalance;
203 entry[jss::type] = "inbound";
204 }
205 }
206 for (auto& outboundEntry : outbound_)
207 {
208 int const localBalance = outboundEntry.localBalance.value(now);
209 if ((localBalance + outboundEntry.remoteBalance) >= threshold)
210 {
211 json::Value& entry = (ret[outboundEntry.toString()] = json::ValueType::Object);
212 entry[jss::local] = localBalance;
213 entry[jss::remote] = outboundEntry.remoteBalance;
214 entry[jss::type] = "outbound";
215 }
216 }
217 for (auto& adminEntry : admin_)
218 {
219 int const localBalance = adminEntry.localBalance.value(now);
220 if ((localBalance + adminEntry.remoteBalance) >= threshold)
221 {
222 json::Value& entry = (ret[adminEntry.toString()] = json::ValueType::Object);
223 entry[jss::local] = localBalance;
224 entry[jss::remote] = adminEntry.remoteBalance;
225 entry[jss::type] = "admin";
226 }
227 }
228
229 return ret;
230 }
231
232 Gossip
234 {
235 clock_type::time_point const now(clock_.now());
236
237 Gossip gossip;
238 std::scoped_lock const _(lock_);
239
240 gossip.items.reserve(inbound_.size());
241
242 for (auto& inboundEntry : inbound_)
243 {
244 Gossip::Item item;
245 item.balance = inboundEntry.localBalance.value(now);
246 if (item.balance >= kMinimumGossipBalance)
247 {
248 item.address = inboundEntry.key->address;
249 gossip.items.push_back(item);
250 }
251 }
252
253 return gossip;
254 }
255
256 //--------------------------------------------------------------------------
257
258 void
259 importConsumers(std::string const& origin, Gossip const& gossip)
260 {
261 auto const elapsed = clock_.now();
262 {
263 std::scoped_lock const _(lock_);
264 auto [resultIt, resultInserted] = importTable_.emplace(
266 std::make_tuple(origin), // Key
267 std::make_tuple(clock_.now().time_since_epoch().count())); // Import
268
269 if (resultInserted)
270 {
271 // This is a new import
272 Import& next(resultIt->second);
273 next.whenExpires = elapsed + kGossipExpirationSeconds;
274 next.items.reserve(gossip.items.size());
275
276 for (auto const& gossipItem : gossip.items)
277 {
278 Import::Item item;
279 item.balance = gossipItem.balance;
280 item.consumer = newInboundEndpoint(gossipItem.address);
281 item.consumer.entry().remoteBalance += item.balance;
282 next.items.push_back(item);
283 }
284 }
285 else
286 {
287 // Previous import exists so add the new remote
288 // balances and then deduct the old remote balances.
289
290 Import next;
291 next.whenExpires = elapsed + kGossipExpirationSeconds;
292 next.items.reserve(gossip.items.size());
293 for (auto const& gossipItem : gossip.items)
294 {
295 Import::Item item;
296 item.balance = gossipItem.balance;
297 item.consumer = newInboundEndpoint(gossipItem.address);
298 item.consumer.entry().remoteBalance += item.balance;
299 next.items.push_back(item);
300 }
301
302 Import& prev(resultIt->second);
303 for (auto& item : prev.items)
304 {
305 item.consumer.entry().remoteBalance -= item.balance;
306 }
307
308 std::swap(next, prev);
309 }
310 }
311 }
312
313 //--------------------------------------------------------------------------
314
315 // Called periodically to expire entries and groom the table.
316 //
317 void
319 {
320 std::scoped_lock const _(lock_);
321
322 auto const elapsed = clock_.now();
323
324 for (auto iter(inactive_.begin()); iter != inactive_.end();)
325 {
326 if (iter->whenExpires <= elapsed)
327 {
328 JLOG(journal_.debug()) << "Expired " << *iter;
329 auto tableIter = table_.find(*iter->key);
330 ++iter;
331 erase(tableIter);
332 }
333 else
334 {
335 break;
336 }
337 }
338
339 auto iter = importTable_.begin();
340 while (iter != importTable_.end())
341 {
342 Import& import(iter->second);
343 if (iter->second.whenExpires <= elapsed)
344 {
345 for (auto itemIter(import.items.begin()); itemIter != import.items.end();
346 ++itemIter)
347 {
348 itemIter->consumer.entry().remoteBalance -= itemIter->balance;
349 }
350
351 iter = importTable_.erase(iter);
352 }
353 else
354 {
355 ++iter;
356 }
357 }
358 }
359
360 //--------------------------------------------------------------------------
361
362 // Returns the disposition based on the balance and thresholds
363 static Disposition
365 {
366 if (balance >= kDropThreshold)
367 return Disposition::Drop;
368
370 return Disposition::Warn;
371
372 return Disposition::Ok;
373 }
374
375 void
376 erase(Table::iterator iter)
377 {
378 std::scoped_lock const _(lock_);
379 Entry& entry(iter->second);
380 XRPL_ASSERT(entry.refcount == 0, "xrpl::Resource::Logic::erase : entry not used");
381 inactive_.erase(inactive_.iteratorTo(entry));
382 table_.erase(iter);
383 }
384
385 void
387 {
388 std::scoped_lock const _(lock_);
389 ++entry.refcount;
390 }
391
392 void
394 {
395 std::scoped_lock const _(lock_);
396 if (--entry.refcount == 0)
397 {
398 JLOG(journal_.debug()) << "Inactive " << entry;
399
400 switch (entry.key->kind)
401 {
402 case Kind::Inbound:
403 inbound_.erase(inbound_.iteratorTo(entry));
404 break;
405 case Kind::Outbound:
406 outbound_.erase(outbound_.iteratorTo(entry));
407 break;
408 case Kind::Unlimited:
409 admin_.erase(admin_.iteratorTo(entry));
410 break;
411 default:
412 // LCOV_EXCL_START
413 UNREACHABLE(
414 "xrpl::Resource::Logic::release : invalid entry "
415 "kind");
416 break;
417 // LCOV_EXCL_STOP
418 }
419 inactive_.pushBack(entry);
420 entry.whenExpires = clock_.now() + kSecondsUntilExpiration;
421 }
422 }
423
425 charge(Entry& entry, Charge const& fee, std::string context = {})
426 {
427 static constexpr Charge::value_type kFeeLogAsWarn = 3000;
428 static constexpr Charge::value_type kFeeLogAsInfo = 1000;
429 static constexpr Charge::value_type kFeeLogAsDebug = 100;
430 static_assert(
431 kFeeLogAsWarn > kFeeLogAsInfo && kFeeLogAsInfo > kFeeLogAsDebug && kFeeLogAsDebug > 10);
432
433 static auto kGetStream = [](Resource::Charge::value_type cost, beast::Journal& journal) {
434 if (cost >= kFeeLogAsWarn)
435 return journal.warn();
436 if (cost >= kFeeLogAsInfo)
437 return journal.info();
438 if (cost >= kFeeLogAsDebug)
439 return journal.debug();
440 return journal.trace();
441 };
442
443 if (!context.empty())
444 context = " (" + context + ")";
445
446 std::scoped_lock const _(lock_);
447 clock_type::time_point const now(clock_.now());
448 int const balance(entry.add(fee.cost(), now));
449 JLOG(kGetStream(fee.cost(), journal_)) << "Charging " << entry << " for " << fee << context;
450 return disposition(balance);
451 }
452
453 bool
454 warn(Entry& entry)
455 {
456 if (entry.isUnlimited())
457 return false;
458
459 std::scoped_lock const _(lock_);
460 bool notify(false);
461 auto const elapsed = clock_.now();
462 if (entry.balance(clock_.now()) >= kWarningThreshold && elapsed != entry.lastWarningTime)
463 {
464 charge(entry, kFeeWarning);
465 notify = true;
466 entry.lastWarningTime = elapsed;
467 }
468 if (notify)
469 {
470 JLOG(journal_.info()) << "Load warning: " << entry;
471 ++stats_.warn;
472 }
473 return notify;
474 }
475
476 bool
478 {
479 if (entry.isUnlimited())
480 return false;
481
482 std::scoped_lock const _(lock_);
483 bool drop(false);
484 clock_type::time_point const now(clock_.now());
485 int const balance(entry.balance(now));
486 if (balance >= kDropThreshold)
487 {
488 JLOG(journal_.warn()) << "Consumer entry " << entry << " dropped with balance "
489 << balance << " at or above drop threshold " << kDropThreshold;
490
491 // Adding feeDrop at this point keeps the dropped connection
492 // from re-connecting for at least a little while after it is
493 // dropped.
494 charge(entry, kFeeDrop);
495 ++stats_.drop;
496 drop = true;
497 }
498 return drop;
499 }
500
501 int
503 {
504 std::scoped_lock const _(lock_);
505 return entry.balance(clock_.now());
506 }
507
508 //--------------------------------------------------------------------------
509
510 static void
512 clock_type::time_point const now,
514 EntryIntrusiveList& list)
515 {
516 for (auto& entry : list)
517 {
518 beast::PropertyStream::Map item(items);
519 if (entry.refcount != 0)
520 item["count"] = entry.refcount;
521 item["name"] = entry.toString();
522 item["balance"] = entry.balance(now);
523 if (entry.remoteBalance != 0)
524 item["remote_balance"] = entry.remoteBalance;
525 }
526 }
527
528 void
530 {
531 clock_type::time_point const now(clock_.now());
532
533 std::scoped_lock const _(lock_);
534
535 {
536 beast::PropertyStream::Set s("inbound", map);
537 writeList(now, s, inbound_);
538 }
539
540 {
541 beast::PropertyStream::Set s("outbound", map);
542 writeList(now, s, outbound_);
543 }
544
545 {
546 beast::PropertyStream::Set s("admin", map);
547 writeList(now, s, admin_);
548 }
549
550 {
551 beast::PropertyStream::Set s("inactive", map);
552 writeList(now, s, inactive_);
553 }
554 }
555};
556
557} // namespace xrpl::Resource
std::chrono::steady_clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:54
Endpoint atPort(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:47
A generic endpoint for log messages.
Definition Journal.h:38
Intrusive doubly linked list.
Definition List.h:260
std::shared_ptr< Collector > ptr
Definition Collector.h:26
A metric for measuring an integral value.
Definition Meter.h:18
Represents a JSON value.
Definition json_value.h:130
A consumption charge.
Definition Charge.h:9
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition Charge.cpp:22
int value_type
The type used to hold a consumption charge.
Definition Charge.h:12
An endpoint that consumes resources.
Definition Consumer.h:15
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
static void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
void importConsumers(std::string const &origin, Gossip const &gossip)
static Disposition disposition(int balance)
void onWrite(beast::PropertyStream::Map &map)
hash_map< std::string, Import > Imports
json::Value getJson(int threshold)
Returns a json::ValueType::Object.
hash_map< Key, Entry, Key::Hasher, Key::KeyEqual > Table
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
T make_tuple(T... args)
@ Object
object value (collection of name/value pairs).
Definition json_value.h:26
Disposition
The disposition of a consumer after applying a load charge.
Definition Disposition.h:6
@ Warn
Consumer should be disconnected for excess consumption.
Definition Disposition.h:12
@ Ok
No action required.
Definition Disposition.h:8
static constexpr std::chrono::seconds kSecondsUntilExpiration
static constexpr auto kMinimumGossipBalance
static constexpr auto kDropThreshold
static constexpr auto kWarningThreshold
Tunable constants.
Charge const kFeeDrop
Charge const kFeeWarning
static constexpr std::chrono::seconds kGossipExpirationSeconds
beast::AbstractClock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition chrono.h:87
std::unordered_map< Key, Value, Hash, Pred, Allocator > hash_map
T piecewise_construct
Describes a single consumer.
Definition Gossip.h:16
beast::IP::Endpoint address
Definition Gossip.h:20
Data format for exchanging consumption information across peers.
Definition Gossip.h:11
std::vector< Item > items
Definition Gossip.h:23
A set of imported consumer data from a gossip origin.
Definition Import.h:10
Stats(beast::insight::Collector::ptr const &collector)
T swap(T... args)