rippled
Loading...
Searching...
No Matches
include/xrpl/resource/detail/Logic.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/UnorderedContainers.h>
5#include <xrpl/basics/chrono.h>
6#include <xrpl/beast/clock/abstract_clock.h>
7#include <xrpl/beast/insight/Insight.h>
8#include <xrpl/beast/utility/PropertyStream.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/json/json_value.h>
11#include <xrpl/protocol/jss.h>
12#include <xrpl/resource/Fees.h>
13#include <xrpl/resource/Gossip.h>
14#include <xrpl/resource/detail/Import.h>
15
16#include <mutex>
17
18namespace xrpl {
19namespace Resource {
20
21class Logic
22{
23private:
28
29 struct Stats
30 {
32 {
33 warn = collector->make_meter("warn");
34 drop = collector->make_meter("drop");
35 }
36
39 };
40
44
46
47 // Table of all entries
49
50 // Because the following are intrusive lists, a given Entry may be in
51 // at most list at a given instant. The Entry must be removed from
52 // one list before placing it in another.
53
54 // List of all active inbound entries
56
57 // List of all active outbound entries
59
60 // List of all active admin entries
62
63 // List of all inactive entries
65
66 // All imported gossip data
68
69 //--------------------------------------------------------------------------
70public:
72 : m_stats(collector), m_clock(clock), m_journal(journal)
73 {
74 }
75
77 {
78 // These have to be cleared before the Logic is destroyed
79 // since their destructors call back into the class.
80 // Order matters here as well, the import table has to be
81 // destroyed before the consumer table.
82 //
84 table_.clear();
85 }
86
89 {
90 Entry* entry(nullptr);
91
92 {
94 auto [resultIt, resultInserted] = table_.emplace(
96 std::make_tuple(kindInbound, address.at_port(0)), // Key
97 std::make_tuple(m_clock.now())); // Entry
98
99 entry = &resultIt->second;
100 entry->key = &resultIt->first;
101 ++entry->refcount;
102 if (entry->refcount == 1)
103 {
104 if (!resultInserted)
105 {
107 }
108 inbound_.push_back(*entry);
109 }
110 }
111
112 JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
113
114 return Consumer(*this, *entry);
115 }
116
119 {
120 Entry* entry(nullptr);
121
122 {
124 auto [resultIt, resultInserted] = table_.emplace(
126 std::make_tuple(kindOutbound, address), // Key
127 std::make_tuple(m_clock.now())); // Entry
128
129 entry = &resultIt->second;
130 entry->key = &resultIt->first;
131 ++entry->refcount;
132 if (entry->refcount == 1)
133 {
134 if (!resultInserted)
136 outbound_.push_back(*entry);
137 }
138 }
139
140 JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
141
142 return Consumer(*this, *entry);
143 }
144
152 {
153 Entry* entry(nullptr);
154
155 {
157 auto [resultIt, resultInserted] = table_.emplace(
159 std::make_tuple(kindUnlimited, address.at_port(1)), // Key
160 std::make_tuple(m_clock.now())); // Entry
161
162 entry = &resultIt->second;
163 entry->key = &resultIt->first;
164 ++entry->refcount;
165 if (entry->refcount == 1)
166 {
167 if (!resultInserted)
169 admin_.push_back(*entry);
170 }
171 }
172
173 JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
174
175 return Consumer(*this, *entry);
176 }
177
180 {
182 }
183
186 getJson(int threshold)
187 {
189
192
193 for (auto& inboundEntry : inbound_)
194 {
195 int localBalance = inboundEntry.local_balance.value(now);
196 if ((localBalance + inboundEntry.remote_balance) >= threshold)
197 {
198 Json::Value& entry = (ret[inboundEntry.to_string()] = Json::objectValue);
199 entry[jss::local] = localBalance;
200 entry[jss::remote] = inboundEntry.remote_balance;
201 entry[jss::type] = "inbound";
202 }
203 }
204 for (auto& outboundEntry : outbound_)
205 {
206 int localBalance = outboundEntry.local_balance.value(now);
207 if ((localBalance + outboundEntry.remote_balance) >= threshold)
208 {
209 Json::Value& entry = (ret[outboundEntry.to_string()] = Json::objectValue);
210 entry[jss::local] = localBalance;
211 entry[jss::remote] = outboundEntry.remote_balance;
212 entry[jss::type] = "outbound";
213 }
214 }
215 for (auto& adminEntry : admin_)
216 {
217 int localBalance = adminEntry.local_balance.value(now);
218 if ((localBalance + adminEntry.remote_balance) >= threshold)
219 {
220 Json::Value& entry = (ret[adminEntry.to_string()] = Json::objectValue);
221 entry[jss::local] = localBalance;
222 entry[jss::remote] = adminEntry.remote_balance;
223 entry[jss::type] = "admin";
224 }
225 }
226
227 return ret;
228 }
229
230 Gossip
232 {
234
235 Gossip gossip;
237
238 gossip.items.reserve(inbound_.size());
239
240 for (auto& inboundEntry : inbound_)
241 {
242 Gossip::Item item;
243 item.balance = inboundEntry.local_balance.value(now);
244 if (item.balance >= minimumGossipBalance)
245 {
246 item.address = inboundEntry.key->address;
247 gossip.items.push_back(item);
248 }
249 }
250
251 return gossip;
252 }
253
254 //--------------------------------------------------------------------------
255
256 void
257 importConsumers(std::string const& origin, Gossip const& gossip)
258 {
259 auto const elapsed = m_clock.now();
260 {
262 auto [resultIt, resultInserted] = importTable_.emplace(
264 std::make_tuple(origin), // Key
265 std::make_tuple(m_clock.now().time_since_epoch().count())); // Import
266
267 if (resultInserted)
268 {
269 // This is a new import
270 Import& next(resultIt->second);
271 next.whenExpires = elapsed + gossipExpirationSeconds;
272 next.items.reserve(gossip.items.size());
273
274 for (auto const& gossipItem : gossip.items)
275 {
276 Import::Item item;
277 item.balance = gossipItem.balance;
278 item.consumer = newInboundEndpoint(gossipItem.address);
279 item.consumer.entry().remote_balance += item.balance;
280 next.items.push_back(item);
281 }
282 }
283 else
284 {
285 // Previous import exists so add the new remote
286 // balances and then deduct the old remote balances.
287
288 Import next;
289 next.whenExpires = elapsed + gossipExpirationSeconds;
290 next.items.reserve(gossip.items.size());
291 for (auto const& gossipItem : gossip.items)
292 {
293 Import::Item item;
294 item.balance = gossipItem.balance;
295 item.consumer = newInboundEndpoint(gossipItem.address);
296 item.consumer.entry().remote_balance += item.balance;
297 next.items.push_back(item);
298 }
299
300 Import& prev(resultIt->second);
301 for (auto& item : prev.items)
302 {
303 item.consumer.entry().remote_balance -= item.balance;
304 }
305
306 std::swap(next, prev);
307 }
308 }
309 }
310
311 //--------------------------------------------------------------------------
312
313 // Called periodically to expire entries and groom the table.
314 //
315 void
317 {
319
320 auto const elapsed = m_clock.now();
321
322 for (auto iter(inactive_.begin()); iter != inactive_.end();)
323 {
324 if (iter->whenExpires <= elapsed)
325 {
326 JLOG(m_journal.debug()) << "Expired " << *iter;
327 auto table_iter = table_.find(*iter->key);
328 ++iter;
329 erase(table_iter);
330 }
331 else
332 {
333 break;
334 }
335 }
336
337 auto iter = importTable_.begin();
338 while (iter != importTable_.end())
339 {
340 Import& import(iter->second);
341 if (iter->second.whenExpires <= elapsed)
342 {
343 for (auto item_iter(import.items.begin()); item_iter != import.items.end(); ++item_iter)
344 {
345 item_iter->consumer.entry().remote_balance -= item_iter->balance;
346 }
347
348 iter = importTable_.erase(iter);
349 }
350 else
351 ++iter;
352 }
353 }
354
355 //--------------------------------------------------------------------------
356
357 // Returns the disposition based on the balance and thresholds
358 static Disposition
360 {
361 if (balance >= dropThreshold)
362 return Disposition::drop;
363
365 return Disposition::warn;
366
367 return Disposition::ok;
368 }
369
370 void
371 erase(Table::iterator iter)
372 {
374 Entry& entry(iter->second);
375 XRPL_ASSERT(entry.refcount == 0, "xrpl::Resource::Logic::erase : entry not used");
377 table_.erase(iter);
378 }
379
380 void
382 {
384 ++entry.refcount;
385 }
386
387 void
389 {
391 if (--entry.refcount == 0)
392 {
393 JLOG(m_journal.debug()) << "Inactive " << entry;
394
395 switch (entry.key->kind)
396 {
397 case kindInbound:
399 break;
400 case kindOutbound:
402 break;
403 case kindUnlimited:
405 break;
406 default:
407 // LCOV_EXCL_START
408 UNREACHABLE(
409 "xrpl::Resource::Logic::release : invalid entry "
410 "kind");
411 break;
412 // LCOV_EXCL_STOP
413 }
414 inactive_.push_back(entry);
415 entry.whenExpires = m_clock.now() + secondsUntilExpiration;
416 }
417 }
418
420 charge(Entry& entry, Charge const& fee, std::string context = {})
421 {
422 static constexpr Charge::value_type feeLogAsWarn = 3000;
423 static constexpr Charge::value_type feeLogAsInfo = 1000;
424 static constexpr Charge::value_type feeLogAsDebug = 100;
425 static_assert(feeLogAsWarn > feeLogAsInfo && feeLogAsInfo > feeLogAsDebug && feeLogAsDebug > 10);
426
427 static auto getStream = [](Resource::Charge::value_type cost, beast::Journal& journal) {
428 if (cost >= feeLogAsWarn)
429 return journal.warn();
430 if (cost >= feeLogAsInfo)
431 return journal.info();
432 if (cost >= feeLogAsDebug)
433 return journal.debug();
434 return journal.trace();
435 };
436
437 if (!context.empty())
438 context = " (" + context + ")";
439
442 int const balance(entry.add(fee.cost(), now));
443 JLOG(getStream(fee.cost(), m_journal)) << "Charging " << entry << " for " << fee << context;
444 return disposition(balance);
445 }
446
447 bool
448 warn(Entry& entry)
449 {
450 if (entry.isUnlimited())
451 return false;
452
454 bool notify(false);
455 auto const elapsed = m_clock.now();
456 if (entry.balance(m_clock.now()) >= warningThreshold && elapsed != entry.lastWarningTime)
457 {
458 charge(entry, feeWarning);
459 notify = true;
460 entry.lastWarningTime = elapsed;
461 }
462 if (notify)
463 {
464 JLOG(m_journal.info()) << "Load warning: " << entry;
465 ++m_stats.warn;
466 }
467 return notify;
468 }
469
470 bool
472 {
473 if (entry.isUnlimited())
474 return false;
475
477 bool drop(false);
479 int const balance(entry.balance(now));
480 if (balance >= dropThreshold)
481 {
482 JLOG(m_journal.warn()) << "Consumer entry " << entry << " dropped with balance " << balance
483 << " at or above drop threshold " << dropThreshold;
484
485 // Adding feeDrop at this point keeps the dropped connection
486 // from re-connecting for at least a little while after it is
487 // dropped.
488 charge(entry, feeDrop);
489 ++m_stats.drop;
490 drop = true;
491 }
492 return drop;
493 }
494
495 int
497 {
499 return entry.balance(m_clock.now());
500 }
501
502 //--------------------------------------------------------------------------
503
504 void
506 {
507 for (auto& entry : list)
508 {
509 beast::PropertyStream::Map item(items);
510 if (entry.refcount != 0)
511 item["count"] = entry.refcount;
512 item["name"] = entry.to_string();
513 item["balance"] = entry.balance(now);
514 if (entry.remote_balance != 0)
515 item["remote_balance"] = entry.remote_balance;
516 }
517 }
518
519 void
521 {
523
525
526 {
527 beast::PropertyStream::Set s("inbound", map);
528 writeList(now, s, inbound_);
529 }
530
531 {
532 beast::PropertyStream::Set s("outbound", map);
533 writeList(now, s, outbound_);
534 }
535
536 {
537 beast::PropertyStream::Set s("admin", map);
538 writeList(now, s, admin_);
539 }
540
541 {
542 beast::PropertyStream::Set s("inactive", map);
543 writeList(now, s, inactive_);
544 }
545 }
546};
547
548} // namespace Resource
549} // namespace xrpl
T begin(T... args)
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:48
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:300
Stream info() const
Definition Journal.h:306
Stream warn() const
Definition Journal.h:312
iterator iterator_to(T &element) const noexcept
Obtain an iterator from an element.
Definition List.h:540
iterator push_back(T &element) noexcept
Append an element at the end of the list.
Definition List.h:487
iterator begin() noexcept
Obtain an iterator to the beginning of the list.
Definition List.h:345
iterator end() noexcept
Obtain a iterator to the end of the list.
Definition List.h:372
size_type size() const noexcept
Returns the number of elements in the list.
Definition List.h:296
iterator erase(iterator pos) noexcept
Remove an element.
Definition List.h:450
virtual time_point now() const =0
Returns the current time.
A metric for measuring an integral value.
Definition Meter.h:18
A consumption charge.
Definition Charge.h:10
value_type cost() const
Return the cost of the charge in Resource::Manager units.
Definition Charge.cpp:22
int value_type
The type used to hold a consumption charge.
Definition Charge.h:13
An endpoint that consumes resources.
Definition Consumer.h:16
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void writeList(clock_type::time_point const now, beast::PropertyStream::Set &items, EntryIntrusiveList &list)
Disposition charge(Entry &entry, Charge const &fee, std::string context={})
void importConsumers(std::string const &origin, Gossip const &gossip)
static Disposition disposition(int balance)
void onWrite(beast::PropertyStream::Map &map)
Json::Value getJson(int threshold)
Returns a Json::objectValue.
Logic(beast::insight::Collector::ptr const &collector, clock_type &clock, beast::Journal journal)
T clear(T... args)
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T make_tuple(T... args)
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
Charge const feeWarning
Charge const feeDrop
Disposition
The disposition of a consumer after applying a load charge.
Definition Disposition.h:7
@ warn
Consumer should be disconnected for excess consumption.
Definition Disposition.h:13
@ ok
No action required.
Definition Disposition.h:9
std::chrono::seconds constexpr gossipExpirationSeconds
std::chrono::seconds constexpr secondsUntilExpiration
@ kindInbound
Definition Kind.h:14
@ kindUnlimited
Definition Kind.h:14
@ kindOutbound
Definition Kind.h:14
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
beast::abstract_clock< std::chrono::steady_clock > Stopwatch
A clock for measuring elapsed time.
Definition chrono.h:86
T piecewise_construct
Describes a single consumer.
Definition Gossip.h:17
beast::IP::Endpoint address
Definition Gossip.h:21
Data format for exchanging consumption information across peers.
Definition Gossip.h:12
std::vector< Item > items
Definition Gossip.h:24
A set of imported consumer data from a gossip origin.
Definition Import.h:11
Stats(beast::insight::Collector::ptr const &collector)
T swap(T... args)