xrpld
Loading...
Searching...
No Matches
io_latency_probe.h
1#pragma once
2
3#include <xrpl/beast/utility/instrumentation.h>
4
5#include <boost/asio/basic_waitable_timer.hpp>
6#include <boost/asio/io_context.hpp>
7#include <boost/asio/post.hpp>
8
9#include <chrono>
10#include <condition_variable>
11#include <mutex>
12#include <stdexcept>
13
14namespace beast {
15
17template <class Clock>
19{
20private:
21 using duration = Clock::duration;
22 using time_point = Clock::time_point;
23
28 boost::asio::io_context& ios_;
29 boost::asio::basic_waitable_timer<std::chrono::steady_clock> timer_;
30 bool cancel_{false};
31
32public:
33 IOLatencyProbe(duration const& period, boost::asio::io_context& ios)
34 : period_(period), ios_(ios), timer_(ios_)
35 {
36 }
37
39 {
40 std::unique_lock<decltype(mutex_)> lock(mutex_);
41 cancel(lock, true);
42 }
43
46 boost::asio::io_context&
48 {
49 return ios_;
50 }
51
52 [[nodiscard]] boost::asio::io_context const&
54 {
55 return ios_;
56 }
57
58
63 void
65 {
66 std::unique_lock<decltype(mutex_)> lock(mutex_);
67 cancel(lock, true);
68 }
69
70 void
72 {
73 std::unique_lock<decltype(mutex_)> lock(mutex_);
74 cancel(lock, false);
75 }
76
77
82 template <class Handler>
83 void
84 sampleOne(Handler&& handler)
85 {
86 std::scoped_lock const lock(mutex_);
87 if (cancel_)
88 throw std::logic_error("IOLatencyProbe is canceled");
89 boost::asio::post(
90 ios_, SampleOp<Handler>(std::forward<Handler>(handler), Clock::now(), false, this));
91 }
92
97 template <class Handler>
98 void
99 sample(Handler&& handler)
100 {
101 std::scoped_lock const lock(mutex_);
102 if (cancel_)
103 throw std::logic_error("IOLatencyProbe is canceled");
104 boost::asio::post(
105 ios_, SampleOp<Handler>(std::forward<Handler>(handler), Clock::now(), true, this));
106 }
107
108private:
109 void
110 cancel(std::unique_lock<decltype(mutex_)>& lock, bool wait)
111 {
112 if (!cancel_)
113 {
114 --count_;
115 cancel_ = true;
116 }
117
118 if (wait)
119 cond_.wait(lock, [this] { return this->count_ == 0; });
120 }
121
122 void
124 {
125 std::scoped_lock const lock(mutex_);
126 ++count_;
127 }
128
129 void
131 {
132 std::scoped_lock const lock(mutex_);
133 if (--count_ == 0)
134 cond_.notify_all();
135 }
136
137 template <class Handler>
138 struct SampleOp
139 {
140 Handler handler;
142 bool repeat;
144
146 Handler const& handler,
147 time_point const& start,
148 bool repeat,
151 {
152 XRPL_ASSERT(
153 probe,
154 "beast::IOLatencyProbe::SampleOp::SampleOp : non-null "
155 "probe input");
156 probe->addref();
157 }
158
159 SampleOp(SampleOp&& from) noexcept
160 : handler(std::move(from.handler))
161 , start(from.start)
162 , repeat(from.repeat)
163 , probe(from.probe)
164 {
165 XRPL_ASSERT(
166 probe,
167 "beast::IOLatencyProbe::SampleOp::SampleOp(SampleOp&&) : "
168 "non-null probe input");
169 from.probe = nullptr;
170 }
171
172 SampleOp(SampleOp const&) = delete;
174 operator=(SampleOp const&) = delete;
175 SampleOp&
176 operator=(SampleOp&&) = delete;
177
179 {
180 if (probe)
181 probe->release();
182 }
183
184 void
186 {
187 if (probe == nullptr)
188 return;
189 typename Clock::time_point const now(Clock::now());
190 typename Clock::duration const elapsed(now - start);
191
192 handler(elapsed);
193
194 {
195 std::scoped_lock const lock(probe->mutex_);
196 if (probe->cancel_)
197 return;
198 }
199
200 if (repeat)
201 {
202 // Calculate when we want to sample again, and
203 // adjust for the expected latency.
204 //
205 typename Clock::time_point const when(now + probe->period_ - (2 * elapsed));
206
207 if (when <= now)
208 {
209 // The latency is too high to maintain the desired
210 // period so don't bother with a timer.
211 //
212 boost::asio::post(probe->ios_, SampleOp<Handler>(handler, now, repeat, probe));
213 }
214 else
215 {
216 probe->timer_.expires_after(when - now);
217 probe->timer_.async_wait(SampleOp<Handler>(handler, now, repeat, probe));
218 }
219 }
220 }
221
222 void
223 operator()(boost::system::error_code const& ec)
224 {
225 if (probe == nullptr)
226 return;
227 typename Clock::time_point const now(Clock::now());
228 boost::asio::post(probe->ios_, SampleOp<Handler>(handler, now, repeat, probe));
229 }
230 };
231};
232
233} // namespace beast
IOLatencyProbe(duration const &period, boost::asio::io_context &ios)
boost::asio::io_context const & getIoContext() const
std::recursive_mutex mutex_
boost::asio::io_context & getIoContext()
Return the io_context associated with the latency probe.
void sampleOne(Handler &&handler)
Measure one sample of i/o latency.
void cancel(std::unique_lock< decltype(mutex_)> &lock, bool wait)
std::condition_variable_any cond_
void cancel()
Cancel all pending i/o.
void sample(Handler &&handler)
Initiate continuous i/o latency sampling.
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
Clock::duration duration
boost::asio::io_context & ios_
Clock::time_point time_point
T forward(T... args)
void operator()(boost::system::error_code const &ec)
SampleOp operator=(SampleOp const &)=delete
SampleOp(SampleOp const &)=delete
SampleOp & operator=(SampleOp &&)=delete
SampleOp(SampleOp &&from) noexcept
SampleOp(Handler const &handler, time_point const &start, bool repeat, IOLatencyProbe *probe)