xrpld
Loading...
Searching...
No Matches
LoadManager.cpp
1#include <xrpld/app/main/LoadManager.h>
2
3#include <xrpld/app/main/Application.h>
4
5#include <xrpl/basics/Log.h>
6#include <xrpl/basics/contract.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/beast/utility/Journal.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/json/to_string.h> // IWYU pragma: keep
11#include <xrpl/server/LoadFeeTrack.h>
12#include <xrpl/server/NetworkOPs.h>
13
14#include <chrono>
15#include <exception>
16#include <memory>
17#include <mutex>
18#include <thread>
19
20namespace xrpl {
21
23 : app_(app), journal_(journal), armed_(false)
24{
25}
26
28{
29 try
30 {
31 stop();
32 }
33 catch (std::exception const& ex)
34 {
35 // Swallow the exception in a destructor.
36 JLOG(journal_.warn()) << "std::exception in ~LoadManager. " << ex.what();
37 }
38}
39
40//------------------------------------------------------------------------------
41
42void
49
50void
57
58//------------------------------------------------------------------------------
59
60void
62{
63 JLOG(journal_.debug()) << "Starting";
64 XRPL_ASSERT(!thread_.joinable(), "xrpl::LoadManager::start : thread not joinable");
65
67}
68
69void
71{
72 {
73 std::scoped_lock const lock(mutex_);
74 stop_ = true;
75 // There is at most one thread waiting on this condition.
76 cv_.notify_all();
77 }
78 if (thread_.joinable())
79 {
80 JLOG(journal_.debug()) << "Stopping";
81 thread_.join();
82 }
83}
84
85//------------------------------------------------------------------------------
86
87void
89{
90 beast::setCurrentThreadName("LoadManager");
91
92 using namespace std::chrono_literals;
93 using clock_type = std::chrono::steady_clock;
94
95 auto t = clock_type::now();
96
97 while (true)
98 {
99 t += 1s;
100
102 if (cv_.wait_until(sl, t, [this] { return stop_; }))
103 break;
104
105 // Copy out shared data under a lock. Use copies outside lock.
106 auto const lastHeartbeat = lastHeartbeat_;
107 auto const armed = armed_;
108 sl.unlock();
109
110 // Measure the amount of time we have been stalled, in seconds.
111 using namespace std::chrono;
112 auto const timeSpentStalled = duration_cast<seconds>(steady_clock::now() - lastHeartbeat);
113
114 static constexpr auto kReportingIntervalSeconds = 10s;
115 static constexpr auto kStallFatalLogMessageTimeLimit = 90s;
116 static constexpr auto kStallLogicErrorTimeLimit = 600s;
117
118 if (armed && (timeSpentStalled >= kReportingIntervalSeconds))
119 {
120 // Report the stalled condition every reportingIntervalSeconds
121 if ((timeSpentStalled % kReportingIntervalSeconds) == 0s)
122 {
123 if (timeSpentStalled < kStallFatalLogMessageTimeLimit)
124 {
125 JLOG(journal_.warn())
126 << "Server stalled for " << timeSpentStalled.count() << " seconds.";
127
128 if (app_.getJobQueue().isOverloaded())
129 {
130 JLOG(journal_.warn()) << "JobQueue: " << app_.getJobQueue().getJson(0);
131 }
132 }
133 else
134 {
135 JLOG(journal_.fatal())
136 << "Server stalled for " << timeSpentStalled.count() << " seconds.";
137 JLOG(journal_.fatal()) << "JobQueue: " << app_.getJobQueue().getJson(0);
138 }
139 }
140
141 // If we go over the stallLogicErrorTimeLimit spent stalled, it
142 // means that the stall resolution code has failed, which qualifies
143 // as a LogicError
144 if (timeSpentStalled >= kStallLogicErrorTimeLimit)
145 {
146 JLOG(journal_.fatal()) << "LogicError: Fatal server stall detected. Stalled time: "
147 << timeSpentStalled.count() << "s";
148 JLOG(journal_.fatal()) << "JobQueue: " << app_.getJobQueue().getJson(0);
149 logicError("Fatal server stall detected");
150 }
151 }
152 }
153
154 bool change = false;
155 if (app_.getJobQueue().isOverloaded())
156 {
157 JLOG(journal_.info()) << "Raising local fee (JQ overload): "
158 << app_.getJobQueue().getJson(0);
159 change = app_.getFeeTrack().raiseLocalFee();
160 }
161 else
162 {
163 change = app_.getFeeTrack().lowerLocalFee();
164 }
165
166 if (change)
167 {
168 // VFALCO TODO replace this with a Listener / observer and
169 // subscribe in NetworkOPs or Application.
170 app_.getOPs().reportFeeChange();
171 }
172}
173
174//------------------------------------------------------------------------------
175
178{
179 return std::unique_ptr<LoadManager>{new LoadManager{app, journal}};
180}
181
182} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:38
LoadManager(Application &app, beast::Journal journal)
std::chrono::steady_clock::time_point lastHeartbeat_
Definition LoadManager.h:87
std::condition_variable cv_
Definition LoadManager.h:83
beast::Journal const journal_
Definition LoadManager.h:79
std::thread thread_
Definition LoadManager.h:81
~LoadManager()
Destroy the manager.
std::mutex mutex_
Definition LoadManager.h:82
void heartbeat()
Reset the stall detection timer.
void activateStallDetector()
Turn on stall detection.
LoadManager()=delete
Application & app_
Definition LoadManager.h:78
T duration_cast(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
void logicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
std::unique_ptr< LoadManager > makeLoadManager(Application &app, beast::Journal journal)
T unlock(T... args)
T what(T... args)