rippled
Loading...
Searching...
No Matches
include/xrpl/resource/detail/Logic.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/UnorderedContainers.h>
5#include <xrpl/basics/chrono.h>
6#include <xrpl/beast/clock/abstract_clock.h>
7#include <xrpl/beast/insight/Insight.h>
8#include <xrpl/beast/utility/PropertyStream.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/json/json_value.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/resource/Fees.h>
13#include <xrpl/resource/Gossip.h>
14#include <xrpl/resource/detail/Import.h>
15
16#include <mutex>
17
18namespace xrpl {
19namespace Resource {
20
21class Logic
22{
23private:
28
29 struct Stats
30 {
32 {
33 warn = collector->make_meter("warn");
34 drop = collector->make_meter("drop");
35 }
36
39 };
40
44
46
47 // Table of all entries
49
50 // Because the following are intrusive lists, a given Entry may be in
51 // at most list at a given instant. The Entry must be removed from
52 // one list before placing it in another.
53
54 // List of all active inbound entries
56
57 // List of all active outbound entries
59
60 // List of all active admin entries
62
63 // List of all inactive entries
65
66 // All imported gossip data
68
69 //--------------------------------------------------------------------------
70public:
72 beast::insight::Collector::ptr const& collector,
73 clock_type& clock,
74 beast::Journal journal)
75 : m_stats(collector), m_clock(clock), m_journal(journal)
76 {
77 }
78
80 {
81 // These have to be cleared before the Logic is destroyed
82 // since their destructors call back into the class.
83 // Order matters here as well, the import table has to be
84 // destroyed before the consumer table.
85 //
87 table_.clear();
88 }
89
92 {
93 Entry* entry(nullptr);
94
95 {
96 std::lock_guard const _(lock_);
97 auto [resultIt, resultInserted] = table_.emplace(
99 std::make_tuple(kindInbound, address.at_port(0)), // Key
100 std::make_tuple(m_clock.now())); // Entry
101
102 entry = &resultIt->second;
103 entry->key = &resultIt->first;
104 ++entry->refcount;
105 if (entry->refcount == 1)
106 {
107 if (!resultInserted)
108 {
110 }
111 inbound_.push_back(*entry);
112 }
113 }
114
115 JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
116
117 return Consumer(*this, *entry);
118 }
119
122 {
123 Entry* entry(nullptr);
124
125 {
126 std::lock_guard const _(lock_);
127 auto [resultIt, resultInserted] = table_.emplace(
129 std::make_tuple(kindOutbound, address), // Key
130 std::make_tuple(m_clock.now())); // Entry
131
132 entry = &resultIt->second;
133 entry->key = &resultIt->first;
134 ++entry->refcount;
135 if (entry->refcount == 1)
136 {
137 if (!resultInserted)
139 outbound_.push_back(*entry);
140 }
141 }
142
143 JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
144
145 return Consumer(*this, *entry);
146 }
147
155 {
156 Entry* entry(nullptr);
157
158 {
159 std::lock_guard const _(lock_);
160 auto [resultIt, resultInserted] = table_.emplace(
162 std::make_tuple(kindUnlimited, address.at_port(1)), // Key
163 std::make_tuple(m_clock.now())); // Entry
164
165 entry = &resultIt->second;
166 entry->key = &resultIt->first;
167 ++entry->refcount;
168 if (entry->refcount == 1)
169 {
170 if (!resultInserted)
172 admin_.push_back(*entry);
173 }
174 }
175
176 JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
177
178 return Consumer(*this, *entry);
179 }
180
183 {
185 }
186
189 getJson(int threshold)
190 {
192
194 std::lock_guard const _(lock_);
195
196 for (auto& inboundEntry : inbound_)
197 {
198 int const localBalance = inboundEntry.local_balance.value(now);
199 if ((localBalance + inboundEntry.remote_balance) >= threshold)
200 {
201 Json::Value& entry = (ret[inboundEntry.to_string()] = Json::objectValue);
202 entry[jss::local] = localBalance;
203 entry[jss::remote] = inboundEntry.remote_balance;
204 entry[jss::type] = "inbound";
205 }
206 }
207 for (auto& outboundEntry : outbound_)
208 {
209 int const localBalance = outboundEntry.local_balance.value(now);
210 if ((localBalance + outboundEntry.remote_balance) >= threshold)
211 {
212 Json::Value& entry = (ret[outboundEntry.to_string()] = Json::objectValue);
213 entry[jss::local] = localBalance;
214 entry[jss::remote] = outboundEntry.remote_balance;
215 entry[jss::type] = "outbound";
216 }
217 }
218 for (auto& adminEntry : admin_)
219 {
220 int const localBalance = adminEntry.local_balance.value(now);
221 if ((localBalance + adminEntry.remote_balance) >= threshold)
222 {
223 Json::Value& entry = (ret[adminEntry.to_string()] = Json::objectValue);
224 entry[jss::local] = localBalance;
225 entry[jss::remote] = adminEntry.remote_balance;
226 entry[jss::type] = "admin";
227 }
228 }
229
230 return ret;
231 }
232
233 Gossip
235 {
237
238 Gossip gossip;
239 std::lock_guard const _(lock_);
240
241 gossip.items.reserve(inbound_.size());
242
243 for (auto& inboundEntry : inbound_)
244 {
245 Gossip::Item item;
246 item.balance = inboundEntry.local_balance.value(now);
247 if (item.balance >= minimumGossipBalance)
248 {
249 item.address = inboundEntry.key->address;
250 gossip.items.push_back(item);
251 }
252 }
253
254 return gossip;
255 }
256
257 //--------------------------------------------------------------------------
258
259 void
260 importConsumers(std::string const& origin, Gossip const& gossip)
261 {
262 auto const elapsed = m_clock.now();
263 {
264 std::lock_guard const _(lock_);
265 auto [resultIt, resultInserted] = importTable_.emplace(
267 std::make_tuple(origin), // Key
268 std::make_tuple(m_clock.now().time_since_epoch().count())); // Import
269
270 if (resultInserted)
271 {
272 // This is a new import
273 Import& next(resultIt->second);
274 next.whenExpires = elapsed + gossipExpirationSeconds;
275 next.items.reserve(gossip.items.size());
276
277 for (auto const& gossipItem : gossip.items)
278 {
279 Import::Item item;
280 item.balance = gossipItem.balance;
281 item.consumer = newInboundEndpoint(gossipItem.address);
282 item.consumer.entry().remote_balance += item.balance;
283 next.items.push_back(item);
284 }
285 }
286 else
287 {
288 // Previous import exists so add the new remote
289 // balances and then deduct the old remote balances.
290
291 Import next;
292 next.whenExpires = elapsed + gossipExpirationSeconds;
293 next.items.reserve(gossip.items.size());
294 for (auto const& gossipItem : gossip.items)
295 {
296 Import::Item item;
297 item.balance = gossipItem.balance;
298 item.consumer = newInboundEndpoint(gossipItem.address);
299 item.consumer.entry().remote_balance += item.balance;
300 next.items.push_back(item);
301 }
302
303 Import& prev(resultIt->second);
304 for (auto& item : prev.items)
305 {
306 item.consumer.entry().remote_balance -= item.balance;
307 }
308
309 std::swap(next, prev);
310 }
311 }
312 }
313
314 //--------------------------------------------------------------------------
315
316 // Called periodically to expire entries and groom the table.
317 //
318 void
320 {
321 std::lock_guard const _(lock_);
322
323 auto const elapsed = m_clock.now();
324
325 for (auto iter(inactive_.begin()); iter != inactive_.end();)
326 {
327 if (iter->whenExpires <= elapsed)
328 {
329 JLOG(m_journal.debug()) << "Expired " << *iter;
330 auto table_iter = table_.find(*iter->key);
331 ++iter;
332 erase(table_iter);
333 }
334 else
335 {
336 break;
337 }
338 }
339
340 auto iter = importTable_.begin();
341 while (iter != importTable_.end())
342 {
343 Import& import(iter->second);
344 if (iter->second.whenExpires <= elapsed)
345 {
346 for (auto item_iter(import.items.begin()); item_iter != import.items.end();
347 ++item_iter)
348 {
349 item_iter->consumer.entry().remote_balance -= item_iter->balance;
350 }
351
352 iter = importTable_.erase(iter);
353 }
354 else
355 ++iter;
356 }
357 }
358
359 //--------------------------------------------------------------------------
360
361 // Returns the disposition based on the balance and thresholds
362 static Disposition
364 {
365 if (balance >= dropThreshold)
366 return Disposition::drop;
367
369 return Disposition::warn;
370
371 return Disposition::ok;
372 }
373
374 void
375 erase(Table::iterator iter)
376 {
377 std::lock_guard const _(lock_);
378 Entry& entry(iter->second);
379 XRPL_ASSERT(entry.refcount == 0, "xrpl::Resource::Logic::erase : entry not used");
381 table_.erase(iter);
382 }
383
384 void
386 {
387 std::lock_guard const _(lock_);
388 ++entry.refcount;
389 }
390
391 void
393 {
394 std::lock_guard const _(lock_);
395 if (--entry.refcount == 0)
396 {
397 JLOG(m_journal.debug()) << "Inactive " << entry;
398
399 switch (entry.key->kind)
400 {
401 case kindInbound:
403 break;
404 case kindOutbound:
406 break;
407 case kindUnlimited:
409 break;
410 default:
411 // LCOV_EXCL_START
412 UNREACHABLE(
413 "xrpl::Resource::Logic::release : invalid entry "
414 "kind");
415 break;
416 // LCOV_EXCL_STOP
417 }
418 inactive_.push_back(entry);
419 entry.whenExpires = m_clock.now() + secondsUntilExpiration;
420 }
421 }
422
424 charge(Entry& entry, Charge const& fee, std::string context = {})
425 {
426 static constexpr Charge::value_type feeLogAsWarn = 3000;
427 static constexpr Charge::value_type feeLogAsInfo = 1000;
428 static constexpr Charge::value_type feeLogAsDebug = 100;
429 static_assert(
430 feeLogAsWarn > feeLogAsInfo && feeLogAsInfo > feeLogAsDebug && feeLogAsDebug > 10);
431
432 static auto getStream = [](Resource::Charge::value_type cost, beast::Journal& journal) {
433 if (cost >= feeLogAsWarn)
434 return journal.warn();
435 if (cost >= feeLogAsInfo)
436 return journal.info();
437 if (cost >= feeLogAsDebug)
438 return journal.debug();
439 return journal.trace();
440 };
441
442 if (!context.empty())
443 context = " (" + context + ")";
444
445 std::lock_guard const _(lock_);
447 int const balance(entry.add(fee.cost(), now));
448 JLOG(getStream(fee.cost(), m_journal)) << "Charging " << entry << " for " << fee << context;
449 return disposition(balance);
450 }
451
452 bool
453 warn(Entry& entry)
454 {
455 if (entry.isUnlimited())
456 return false;
457
458 std::lock_guard const _(lock_);
459 bool notify(false);
460 auto const elapsed = m_clock.now();
461 if (entry.balance(m_clock.now()) >= warningThreshold && elapsed != entry.lastWarningTime)
462 {
463 charge(entry, feeWarning);
464 notify = true;
465 entry.lastWarningTime = elapsed;
466 }
467 if (notify)
468 {
469 JLOG(m_journal.info()) << "Load warning: " << entry;
470 ++m_stats.warn;
471 }
472 return notify;
473 }
474
475 bool
477 {
478 if (entry.isUnlimited())
479 return false;
480
481 std::lock_guard const _(lock_);
482 bool drop(false);
484 int const balance(entry.balance(now));
485 if (balance >= dropThreshold)
486 {
487 JLOG(m_journal.warn()) << "Consumer entry " << entry << " dropped with balance "
488 << balance << " at or above drop threshold " << dropThreshold;
489
490 // Adding feeDrop at this point keeps the dropped connection
491 // from re-connecting for at least a little while after it is
492 // dropped.
493 charge(entry, feeDrop);
494 ++m_stats.drop;
495 drop = true;
496 }
497 return drop;
498 }
499
500 int
502 {
503 std::lock_guard const _(lock_);
504 return entry.balance(m_clock.now());
505 }
506
507 //--------------------------------------------------------------------------
508
509 void
511 clock_type::time_point const now,
513 EntryIntrusiveList& list)
514 {
515 for (auto& entry : list)
516 {
517 beast::PropertyStream::Map item(items);
518 if (entry.refcount != 0)
519 item["count"] = entry.refcount;
520 item["name"] = entry.to_string();
521 item["balance"] = entry.balance(now);
522 if (entry.remote_balance != 0)
523 item["remote_balance"] = entry.remote_balance;
524 }
525 }
526
527 void
529 {
531
532 std::lock_guard const _(lock_);
533
534 {
535 beast::PropertyStream::Set s("inbound", map);
536 writeList(now, s, inbound_);
537 }
538
539 {
540 beast::PropertyStream::Set s("outbound", map);
541 writeList(now, s, outbound_);
542 }
543
544 {
545 beast::PropertyStream::Set s("admin", map);
546 writeList(now, s, admin_);
547 }
548
549 {
550 beast::PropertyStream::Set s("inactive", map);
551 writeList(now, s, inactive_);
552 }
553 }
554};
555
556} // namespace Resource
557} // namespace xrpl
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:48
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream warn() const
Definition Journal.h:313
iterator iterator_to(T &element) const noexcept
Obtain an iterator from an element.
Definition List.h:540
iterator push_back(T &element) noexcept
Append an element at the end of the list.
Definition List.h:487
iterator begin() noexcept
Obtain an iterator to the beginning of the list.
Definition List.h:345
iterator end() noexcept
Obtain a iterator to the end of the list.
Definition List.h:372
size_type size() const noexcept
Returns the number of elements in the list.
Definition List.h:296
iterator erase(iterator pos) noexcept
Remove an element.
Definition List.h:450
virtual time_point now() const =0
Returns the current time.
A metric for measuring an integral value.
Definition Meter.h:18
A consumption charge.
Definition Charge.h:10
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition Charge.cpp:22
int value_type
The type used to hold a consumption charge.
Definition Charge.h:13
An endpoint that consumes resources.
Definition Consumer.h:16
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
void importConsumers(std::string const &origin, Gossip const &gossip)
static Disposition disposition(int balance)
void onWrite(beast::PropertyStream::Map &map)
Json::Value getJson(int threshold)
Returns a Json::objectValue.
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T make_tuple(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Charge const feeWarning
Charge const feeDrop
Disposition
The disposition of a consumer after applying a load charge.
Definition Disposition.h:7
@ warn
Consumer should be disconnected for excess consumption.
Definition Disposition.h:13
@ ok
No action required.
Definition Disposition.h:9
std::chrono::seconds constexpr gossipExpirationSeconds
std::chrono::seconds constexpr secondsUntilExpiration
@ kindInbound
Definition Kind.h:14
@ kindUnlimited
Definition Kind.h:14
@ kindOutbound
Definition Kind.h:14
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
beast::abstract_clock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition chrono.h:87
T piecewise_construct
Describes a single consumer.
Definition Gossip.h:17
beast::IP::Endpoint address
Definition Gossip.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:12
std::vector< Item > items
Definition Gossip.h:24
A set of imported consumer data from a gossip origin.
Definition Import.h:11
Stats(beast::insight::Collector::ptr const &collector)
T swap(T... args)