rippled
Loading...
Searching...
No Matches
LoadManager.cpp
1#include <xrpld/app/main/Application.h>
2#include <xrpld/app/main/LoadManager.h>
3#include <xrpld/app/misc/LoadFeeTrack.h>
4#include <xrpld/app/misc/NetworkOPs.h>
5
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/json/to_string.h>
8
9#include <memory>
10#include <mutex>
11#include <thread>
12
13namespace xrpl {
14
16 : app_(app), journal_(journal), lastHeartbeat_(), armed_(false)
17{
18}
19
21{
22 try
23 {
24 stop();
25 }
26 catch (std::exception const& ex)
27 {
28 // Swallow the exception in a destructor.
29 JLOG(journal_.warn())
30 << "std::exception in ~LoadManager. " << ex.what();
31 }
32}
33
34//------------------------------------------------------------------------------
35
36void
43
44void
51
52//------------------------------------------------------------------------------
53
54void
56{
57 JLOG(journal_.debug()) << "Starting";
58 XRPL_ASSERT(
59 !thread_.joinable(), "xrpl::LoadManager::start : thread not joinable");
60
62}
63
64void
66{
67 {
69 stop_ = true;
70 // There is at most one thread waiting on this condition.
72 }
73 if (thread_.joinable())
74 {
75 JLOG(journal_.debug()) << "Stopping";
76 thread_.join();
77 }
78}
79
80//------------------------------------------------------------------------------
81
82void
84{
85 beast::setCurrentThreadName("LoadManager");
86
87 using namespace std::chrono_literals;
88 using clock_type = std::chrono::steady_clock;
89
90 auto t = clock_type::now();
91
92 while (true)
93 {
94 t += 1s;
95
97 if (cv_.wait_until(sl, t, [this] { return stop_; }))
98 break;
99
100 // Copy out shared data under a lock. Use copies outside lock.
101 auto const lastHeartbeat = lastHeartbeat_;
102 auto const armed = armed_;
103 sl.unlock();
104
105 // Measure the amount of time we have been stalled, in seconds.
106 using namespace std::chrono;
107 auto const timeSpentStalled =
108 duration_cast<seconds>(steady_clock::now() - lastHeartbeat);
109
110 constexpr auto reportingIntervalSeconds = 10s;
111 constexpr auto stallFatalLogMessageTimeLimit = 90s;
112 constexpr auto stallLogicErrorTimeLimit = 600s;
113
114 if (armed && (timeSpentStalled >= reportingIntervalSeconds))
115 {
116 // Report the stalled condition every reportingIntervalSeconds
117 if ((timeSpentStalled % reportingIntervalSeconds) == 0s)
118 {
119 if (timeSpentStalled < stallFatalLogMessageTimeLimit)
120 {
121 JLOG(journal_.warn())
122 << "Server stalled for " << timeSpentStalled.count()
123 << " seconds.";
124
126 {
127 JLOG(journal_.warn())
128 << "JobQueue: " << app_.getJobQueue().getJson(0);
129 }
130 }
131 else
132 {
133 JLOG(journal_.fatal())
134 << "Server stalled for " << timeSpentStalled.count()
135 << " seconds.";
136 JLOG(journal_.fatal())
137 << "JobQueue: " << app_.getJobQueue().getJson(0);
138 }
139 }
140
141 // If we go over the stallLogicErrorTimeLimit spent stalled, it
142 // means that the stall resolution code has failed, which qualifies
143 // as a LogicError
144 if (timeSpentStalled >= stallLogicErrorTimeLimit)
145 {
146 JLOG(journal_.fatal())
147 << "LogicError: Fatal server stall detected. Stalled time: "
148 << timeSpentStalled.count() << "s";
149 JLOG(journal_.fatal())
150 << "JobQueue: " << app_.getJobQueue().getJson(0);
151 LogicError("Fatal server stall detected");
152 }
153 }
154 }
155
156 bool change = false;
158 {
159 JLOG(journal_.info()) << "Raising local fee (JQ overload): "
160 << app_.getJobQueue().getJson(0);
161 change = app_.getFeeTrack().raiseLocalFee();
162 }
163 else
164 {
165 change = app_.getFeeTrack().lowerLocalFee();
166 }
167
168 if (change)
169 {
170 // VFALCO TODO replace this with a Listener / observer and
171 // subscribe in NetworkOPs or Application.
173 }
174}
175
176//------------------------------------------------------------------------------
177
180{
181 return std::unique_ptr<LoadManager>{new LoadManager{app, journal}};
182}
183
184} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream warn() const
Definition Journal.h:321
virtual LoadFeeTrack & getFeeTrack()=0
virtual JobQueue & getJobQueue()=0
virtual NetworkOPs & getOPs()=0
bool isOverloaded()
Definition JobQueue.cpp:185
Json::Value getJson(int c=0)
Definition JobQueue.cpp:193
Manages load sources.
Definition LoadManager.h:27
std::chrono::steady_clock::time_point lastHeartbeat_
Definition LoadManager.h:87
std::condition_variable cv_
Definition LoadManager.h:83
beast::Journal const journal_
Definition LoadManager.h:79
std::thread thread_
Definition LoadManager.h:81
~LoadManager()
Destroy the manager.
std::mutex mutex_
Definition LoadManager.h:82
void heartbeat()
Reset the stall detection timer.
void activateStallDetector()
Turn on stall detection.
LoadManager()=delete
Application & app_
Definition LoadManager.h:78
virtual void reportFeeChange()=0
T join(T... args)
T joinable(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
std::unique_ptr< LoadManager > make_LoadManager(Application &app, beast::Journal journal)
T unlock(T... args)
T what(T... args)