rippled
Loading...
Searching...
No Matches
include/xrpl/resource/detail/Logic.h
1#ifndef XRPL_RESOURCE_LOGIC_H_INCLUDED
2#define XRPL_RESOURCE_LOGIC_H_INCLUDED
3
4#include <xrpl/basics/Log.h>
5#include <xrpl/basics/UnorderedContainers.h>
6#include <xrpl/basics/chrono.h>
7#include <xrpl/beast/clock/abstract_clock.h>
8#include <xrpl/beast/insight/Insight.h>
9#include <xrpl/beast/utility/PropertyStream.h>
10#include <xrpl/beast/utility/instrumentation.h>
11#include <xrpl/json/json_value.h>
12#include <xrpl/protocol/jss.h>
13#include <xrpl/resource/Fees.h>
14#include <xrpl/resource/Gossip.h>
15#include <xrpl/resource/detail/Import.h>
16
17#include <mutex>
18
19namespace ripple {
20namespace Resource {
21
22class Logic
23{
24private:
29
30 struct Stats
31 {
33 {
34 warn = collector->make_meter("warn");
35 drop = collector->make_meter("drop");
36 }
37
40 };
41
45
47
48 // Table of all entries
50
51 // Because the following are intrusive lists, a given Entry may be in
52 // at most list at a given instant. The Entry must be removed from
53 // one list before placing it in another.
54
55 // List of all active inbound entries
57
58 // List of all active outbound entries
60
61 // List of all active admin entries
63
64 // List of all inactve entries
66
67 // All imported gossip data
69
70 //--------------------------------------------------------------------------
71public:
73 beast::insight::Collector::ptr const& collector,
74 clock_type& clock,
75 beast::Journal journal)
76 : m_stats(collector), m_clock(clock), m_journal(journal)
77 {
78 }
79
81 {
82 // These have to be cleared before the Logic is destroyed
83 // since their destructors call back into the class.
84 // Order matters here as well, the import table has to be
85 // destroyed before the consumer table.
86 //
88 table_.clear();
89 }
90
93 {
94 Entry* entry(nullptr);
95
96 {
98 auto [resultIt, resultInserted] = table_.emplace(
100 std::make_tuple(kindInbound, address.at_port(0)), // Key
101 std::make_tuple(m_clock.now())); // Entry
102
103 entry = &resultIt->second;
104 entry->key = &resultIt->first;
105 ++entry->refcount;
106 if (entry->refcount == 1)
107 {
108 if (!resultInserted)
109 {
111 }
112 inbound_.push_back(*entry);
113 }
114 }
115
116 JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
117
118 return Consumer(*this, *entry);
119 }
120
123 {
124 Entry* entry(nullptr);
125
126 {
128 auto [resultIt, resultInserted] = table_.emplace(
130 std::make_tuple(kindOutbound, address), // Key
131 std::make_tuple(m_clock.now())); // Entry
132
133 entry = &resultIt->second;
134 entry->key = &resultIt->first;
135 ++entry->refcount;
136 if (entry->refcount == 1)
137 {
138 if (!resultInserted)
140 outbound_.push_back(*entry);
141 }
142 }
143
144 JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
145
146 return Consumer(*this, *entry);
147 }
148
156 {
157 Entry* entry(nullptr);
158
159 {
161 auto [resultIt, resultInserted] = table_.emplace(
163 std::make_tuple(kindUnlimited, address.at_port(1)), // Key
164 std::make_tuple(m_clock.now())); // Entry
165
166 entry = &resultIt->second;
167 entry->key = &resultIt->first;
168 ++entry->refcount;
169 if (entry->refcount == 1)
170 {
171 if (!resultInserted)
173 admin_.push_back(*entry);
174 }
175 }
176
177 JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
178
179 return Consumer(*this, *entry);
180 }
181
184 {
186 }
187
190 getJson(int threshold)
191 {
193
196
197 for (auto& inboundEntry : inbound_)
198 {
199 int localBalance = inboundEntry.local_balance.value(now);
200 if ((localBalance + inboundEntry.remote_balance) >= threshold)
201 {
202 Json::Value& entry =
203 (ret[inboundEntry.to_string()] = Json::objectValue);
204 entry[jss::local] = localBalance;
205 entry[jss::remote] = inboundEntry.remote_balance;
206 entry[jss::type] = "inbound";
207 }
208 }
209 for (auto& outboundEntry : outbound_)
210 {
211 int localBalance = outboundEntry.local_balance.value(now);
212 if ((localBalance + outboundEntry.remote_balance) >= threshold)
213 {
214 Json::Value& entry =
215 (ret[outboundEntry.to_string()] = Json::objectValue);
216 entry[jss::local] = localBalance;
217 entry[jss::remote] = outboundEntry.remote_balance;
218 entry[jss::type] = "outbound";
219 }
220 }
221 for (auto& adminEntry : admin_)
222 {
223 int localBalance = adminEntry.local_balance.value(now);
224 if ((localBalance + adminEntry.remote_balance) >= threshold)
225 {
226 Json::Value& entry =
227 (ret[adminEntry.to_string()] = Json::objectValue);
228 entry[jss::local] = localBalance;
229 entry[jss::remote] = adminEntry.remote_balance;
230 entry[jss::type] = "admin";
231 }
232 }
233
234 return ret;
235 }
236
237 Gossip
239 {
241
242 Gossip gossip;
244
245 gossip.items.reserve(inbound_.size());
246
247 for (auto& inboundEntry : inbound_)
248 {
249 Gossip::Item item;
250 item.balance = inboundEntry.local_balance.value(now);
251 if (item.balance >= minimumGossipBalance)
252 {
253 item.address = inboundEntry.key->address;
254 gossip.items.push_back(item);
255 }
256 }
257
258 return gossip;
259 }
260
261 //--------------------------------------------------------------------------
262
263 void
264 importConsumers(std::string const& origin, Gossip const& gossip)
265 {
266 auto const elapsed = m_clock.now();
267 {
269 auto [resultIt, resultInserted] = importTable_.emplace(
271 std::make_tuple(origin), // Key
273 m_clock.now().time_since_epoch().count())); // Import
274
275 if (resultInserted)
276 {
277 // This is a new import
278 Import& next(resultIt->second);
279 next.whenExpires = elapsed + gossipExpirationSeconds;
280 next.items.reserve(gossip.items.size());
281
282 for (auto const& gossipItem : gossip.items)
283 {
284 Import::Item item;
285 item.balance = gossipItem.balance;
286 item.consumer = newInboundEndpoint(gossipItem.address);
287 item.consumer.entry().remote_balance += item.balance;
288 next.items.push_back(item);
289 }
290 }
291 else
292 {
293 // Previous import exists so add the new remote
294 // balances and then deduct the old remote balances.
295
296 Import next;
297 next.whenExpires = elapsed + gossipExpirationSeconds;
298 next.items.reserve(gossip.items.size());
299 for (auto const& gossipItem : gossip.items)
300 {
301 Import::Item item;
302 item.balance = gossipItem.balance;
303 item.consumer = newInboundEndpoint(gossipItem.address);
304 item.consumer.entry().remote_balance += item.balance;
305 next.items.push_back(item);
306 }
307
308 Import& prev(resultIt->second);
309 for (auto& item : prev.items)
310 {
311 item.consumer.entry().remote_balance -= item.balance;
312 }
313
314 std::swap(next, prev);
315 }
316 }
317 }
318
319 //--------------------------------------------------------------------------
320
321 // Called periodically to expire entries and groom the table.
322 //
323 void
325 {
327
328 auto const elapsed = m_clock.now();
329
330 for (auto iter(inactive_.begin()); iter != inactive_.end();)
331 {
332 if (iter->whenExpires <= elapsed)
333 {
334 JLOG(m_journal.debug()) << "Expired " << *iter;
335 auto table_iter = table_.find(*iter->key);
336 ++iter;
337 erase(table_iter);
338 }
339 else
340 {
341 break;
342 }
343 }
344
345 auto iter = importTable_.begin();
346 while (iter != importTable_.end())
347 {
348 Import& import(iter->second);
349 if (iter->second.whenExpires <= elapsed)
350 {
351 for (auto item_iter(import.items.begin());
352 item_iter != import.items.end();
353 ++item_iter)
354 {
355 item_iter->consumer.entry().remote_balance -=
356 item_iter->balance;
357 }
358
359 iter = importTable_.erase(iter);
360 }
361 else
362 ++iter;
363 }
364 }
365
366 //--------------------------------------------------------------------------
367
368 // Returns the disposition based on the balance and thresholds
369 static Disposition
371 {
372 if (balance >= dropThreshold)
373 return Disposition::drop;
374
376 return Disposition::warn;
377
378 return Disposition::ok;
379 }
380
381 void
382 erase(Table::iterator iter)
383 {
385 Entry& entry(iter->second);
386 XRPL_ASSERT(
387 entry.refcount == 0,
388 "ripple::Resource::Logic::erase : entry not used");
390 table_.erase(iter);
391 }
392
393 void
395 {
397 ++entry.refcount;
398 }
399
400 void
402 {
404 if (--entry.refcount == 0)
405 {
406 JLOG(m_journal.debug()) << "Inactive " << entry;
407
408 switch (entry.key->kind)
409 {
410 case kindInbound:
412 break;
413 case kindOutbound:
415 break;
416 case kindUnlimited:
418 break;
419 default:
420 // LCOV_EXCL_START
421 UNREACHABLE(
422 "ripple::Resource::Logic::release : invalid entry "
423 "kind");
424 break;
425 // LCOV_EXCL_STOP
426 }
427 inactive_.push_back(entry);
428 entry.whenExpires = m_clock.now() + secondsUntilExpiration;
429 }
430 }
431
433 charge(Entry& entry, Charge const& fee, std::string context = {})
434 {
435 static constexpr Charge::value_type feeLogAsWarn = 3000;
436 static constexpr Charge::value_type feeLogAsInfo = 1000;
437 static constexpr Charge::value_type feeLogAsDebug = 100;
438 static_assert(
439 feeLogAsWarn > feeLogAsInfo && feeLogAsInfo > feeLogAsDebug &&
440 feeLogAsDebug > 10);
441
442 static auto getStream = [](Resource::Charge::value_type cost,
443 beast::Journal& journal) {
444 if (cost >= feeLogAsWarn)
445 return journal.warn();
446 if (cost >= feeLogAsInfo)
447 return journal.info();
448 if (cost >= feeLogAsDebug)
449 return journal.debug();
450 return journal.trace();
451 };
452
453 if (!context.empty())
454 context = " (" + context + ")";
455
458 int const balance(entry.add(fee.cost(), now));
459 JLOG(getStream(fee.cost(), m_journal))
460 << "Charging " << entry << " for " << fee << context;
461 return disposition(balance);
462 }
463
464 bool
465 warn(Entry& entry)
466 {
467 if (entry.isUnlimited())
468 return false;
469
471 bool notify(false);
472 auto const elapsed = m_clock.now();
473 if (entry.balance(m_clock.now()) >= warningThreshold &&
474 elapsed != entry.lastWarningTime)
475 {
476 charge(entry, feeWarning);
477 notify = true;
478 entry.lastWarningTime = elapsed;
479 }
480 if (notify)
481 {
482 JLOG(m_journal.info()) << "Load warning: " << entry;
483 ++m_stats.warn;
484 }
485 return notify;
486 }
487
488 bool
490 {
491 if (entry.isUnlimited())
492 return false;
493
495 bool drop(false);
497 int const balance(entry.balance(now));
498 if (balance >= dropThreshold)
499 {
500 JLOG(m_journal.warn())
501 << "Consumer entry " << entry << " dropped with balance "
502 << balance << " at or above drop threshold " << dropThreshold;
503
504 // Adding feeDrop at this point keeps the dropped connection
505 // from re-connecting for at least a little while after it is
506 // dropped.
507 charge(entry, feeDrop);
508 ++m_stats.drop;
509 drop = true;
510 }
511 return drop;
512 }
513
514 int
516 {
518 return entry.balance(m_clock.now());
519 }
520
521 //--------------------------------------------------------------------------
522
523 void
525 clock_type::time_point const now,
527 EntryIntrusiveList& list)
528 {
529 for (auto& entry : list)
530 {
531 beast::PropertyStream::Map item(items);
532 if (entry.refcount != 0)
533 item["count"] = entry.refcount;
534 item["name"] = entry.to_string();
535 item["balance"] = entry.balance(now);
536 if (entry.remote_balance != 0)
537 item["remote_balance"] = entry.remote_balance;
538 }
539 }
540
541 void
543 {
545
547
548 {
549 beast::PropertyStream::Set s("inbound", map);
550 writeList(now, s, inbound_);
551 }
552
553 {
554 beast::PropertyStream::Set s("outbound", map);
555 writeList(now, s, outbound_);
556 }
557
558 {
559 beast::PropertyStream::Set s("admin", map);
560 writeList(now, s, admin_);
561 }
562
563 {
564 beast::PropertyStream::Set s("inactive", map);
565 writeList(now, s, inactive_);
566 }
567 }
568};
569
570} // namespace Resource
571} // namespace ripple
572
573#endif
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:49
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream warn() const
Definition Journal.h:321
iterator iterator_to(T &element) const noexcept
Obtain an iterator from an element.
Definition List.h:542
iterator push_back(T &element) noexcept
Append an element at the end of the list.
Definition List.h:489
iterator begin() noexcept
Obtain an iterator to the beginning of the list.
Definition List.h:347
iterator end() noexcept
Obtain a iterator to the end of the list.
Definition List.h:374
size_type size() const noexcept
Returns the number of elements in the list.
Definition List.h:298
iterator erase(iterator pos) noexcept
Remove an element.
Definition List.h:452
virtual time_point now() const =0
Returns the current time.
A metric for measuring an integral value.
Definition Meter.h:19
A consumption charge.
Definition Charge.h:11
int value_type
The type used to hold a consumption charge.
Definition Charge.h:14
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition Charge.cpp:23
An endpoint that consumes resources.
Definition Consumer.h:17
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
void importConsumers(std::string const &origin, Gossip const &gossip)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void onWrite(beast::PropertyStream::Map &map)
Json::Value getJson(int threshold)
Returns a Json::objectValue.
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
static Disposition disposition(int balance)
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T make_tuple(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
std::chrono::seconds constexpr gossipExpirationSeconds
Charge const feeDrop
Charge const feeWarning
Disposition
The disposition of a consumer after applying a load charge.
Definition Disposition.h:8
@ ok
No action required.
Definition Disposition.h:10
@ warn
Consumer should be disconnected for excess consumption.
Definition Disposition.h:14
std::chrono::seconds constexpr secondsUntilExpiration
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
beast::abstract_clock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition chrono.h:93
T piecewise_construct
Describes a single consumer.
Definition Gossip.h:18
beast::IP::Endpoint address
Definition Gossip.h:22
Data format for exchanging consumption information across peers.
Definition Gossip.h:13
std::vector< Item > items
Definition Gossip.h:25
A set of imported consumer data from a gossip origin.
Definition Import.h:12
Stats(beast::insight::Collector::ptr const &collector)
T swap(T... args)