rippled
Loading...
Searching...
No Matches
ResourceManager.cpp
1#include <xrpl/basics/chrono.h>
2#include <xrpl/beast/core/CurrentThreadName.h>
3#include <xrpl/beast/insight/Collector.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/net/IPEndpoint.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/PropertyStream.h>
8#include <xrpl/json/json_value.h>
9#include <xrpl/resource/Consumer.h>
10#include <xrpl/resource/Gossip.h>
11#include <xrpl/resource/ResourceManager.h>
12#include <xrpl/resource/detail/Logic.h>
13
14#include <boost/asio/ip/address.hpp>
15#include <boost/system/detail/error_code.hpp>
16
17#include <chrono>
18#include <condition_variable>
19#include <memory>
20#include <mutex>
21#include <string>
22#include <string_view>
23#include <thread>
24
25namespace ripple {
26namespace Resource {
27
28class ManagerImp : public Manager
29{
30private:
34 bool stop_ = false;
37
38public:
40 beast::insight::Collector::ptr const& collector,
41 beast::Journal journal)
42 : journal_(journal), logic_(collector, stopwatch(), journal)
43 {
45 }
46
47 ManagerImp() = delete;
48 ManagerImp(ManagerImp const&) = delete;
50 operator=(ManagerImp const&) = delete;
51
52 ~ManagerImp() override
53 {
54 {
56 stop_ = true;
58 }
59 thread_.join();
60 }
61
63 newInboundEndpoint(beast::IP::Endpoint const& address) override
64 {
65 return logic_.newInboundEndpoint(address);
66 }
67
70 beast::IP::Endpoint const& address,
71 bool const proxy,
73 {
74 if (!proxy)
75 return newInboundEndpoint(address);
76
77 boost::system::error_code ec;
78 auto const proxiedIp = boost::asio::ip::make_address(forwardedFor, ec);
79 if (ec)
80 {
82 << "forwarded for (" << forwardedFor << ") from proxy "
83 << address.to_string()
84 << " doesn't convert to IP endpoint: " << ec.message();
85 return newInboundEndpoint(address);
86 }
87 return newInboundEndpoint(
89 }
90
93 {
94 return logic_.newOutboundEndpoint(address);
95 }
96
99 {
100 return logic_.newUnlimitedEndpoint(address);
101 }
102
103 Gossip
105 {
106 return logic_.exportConsumers();
107 }
108
109 void
110 importConsumers(std::string const& origin, Gossip const& gossip) override
111 {
112 logic_.importConsumers(origin, gossip);
113 }
114
115 //--------------------------------------------------------------------------
116
118 getJson() override
119 {
120 return logic_.getJson();
121 }
122
124 getJson(int threshold) override
125 {
126 return logic_.getJson(threshold);
127 }
128
129 //--------------------------------------------------------------------------
130
131 void
133 {
134 logic_.onWrite(map);
135 }
136
137 //--------------------------------------------------------------------------
138
139private:
140 void
142 {
143 beast::setCurrentThreadName("Resource::Manager");
144 for (;;)
145 {
149 if (stop_)
150 break;
151 }
152 }
153};
154
155//------------------------------------------------------------------------------
156
157Manager::Manager() : beast::PropertyStream::Source("resource")
158{
159}
160
161Manager::~Manager() = default;
162
163//------------------------------------------------------------------------------
164
167 beast::insight::Collector::ptr const& collector,
168 beast::Journal journal)
169{
170 return std::make_unique<ManagerImp>(collector, journal);
171}
172
173} // namespace Resource
174} // namespace ripple
Represents a JSON value.
Definition json_value.h:130
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
std::string to_string() const
Returns a string representing the endpoint.
A generic endpoint for log messages.
Definition Journal.h:41
Stream warn() const
Definition Journal.h:321
An endpoint that consumes resources.
Definition Consumer.h:17
Consumer newInboundEndpoint(beast::IP::Endpoint const &address)
void importConsumers(std::string const &origin, Gossip const &gossip)
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address)
Create endpoint that should not have resource limits applied.
void onWrite(beast::PropertyStream::Map &map)
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address)
std::condition_variable cond_
Json::Value getJson() override
Extract consumer information for reporting.
Consumer newOutboundEndpoint(beast::IP::Endpoint const &address) override
Create a new endpoint keyed by outbound IP address and port.
Consumer newUnlimitedEndpoint(beast::IP::Endpoint const &address) override
Create a new unlimited endpoint keyed by forwarded IP.
Gossip exportConsumers() override
Extract packaged consumer information for export.
ManagerImp(ManagerImp const &)=delete
void onWrite(beast::PropertyStream::Map &map) override
Subclass override.
Json::Value getJson(int threshold) override
Consumer newInboundEndpoint(beast::IP::Endpoint const &address, bool const proxy, std::string_view forwardedFor) override
Consumer newInboundEndpoint(beast::IP::Endpoint const &address) override
Create a new endpoint keyed by inbound IP address or the forwarded IP if proxied.
beast::Journal const journal_
ManagerImp & operator=(ManagerImp const &)=delete
void importConsumers(std::string const &origin, Gossip const &gossip) override
Import packaged consumer information.
ManagerImp(beast::insight::Collector::ptr const &collector, beast::Journal journal)
Tracks load and resource consumption.
T is_same_v
T join(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
std::unique_ptr< Manager > make_Manager(beast::insight::Collector::ptr const &collector, beast::Journal journal)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string_view forwardedFor(http_request_type const &request)
Definition Role.cpp:243
Stopwatch & stopwatch()
Returns an instance of a wall clock.
Definition chrono.h:100
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Data format for exchanging consumption information across peers.
Definition Gossip.h:13