xrpld
Loading...
Searching...
No Matches
ResolverAsio.cpp
1#include <xrpl/basics/ResolverAsio.h>
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/Resolver.h>
5#include <xrpl/beast/net/IPAddressConversion.h>
6#include <xrpl/beast/net/IPEndpoint.h>
7#include <xrpl/beast/utility/Journal.h>
8#include <xrpl/beast/utility/instrumentation.h>
9
10#include <boost/asio/bind_executor.hpp>
11#include <boost/asio/dispatch.hpp>
12#include <boost/asio/error.hpp>
13#include <boost/asio/io_context.hpp>
14#include <boost/asio/ip/tcp.hpp>
15#include <boost/asio/post.hpp>
16#include <boost/asio/strand.hpp>
17#include <boost/system/detail/error_code.hpp>
18
19#include <algorithm>
20#include <atomic>
21#include <cctype>
22#include <condition_variable>
23#include <deque>
24#include <functional>
25#include <iterator>
26#include <locale>
27#include <memory>
28#include <mutex>
29#include <ranges>
30#include <string>
31#include <utility>
32#include <vector>
33
34namespace xrpl {
35
40template <class Derived>
42{
44 {
45 }
46
47public:
49 {
50 // Destroying the object with I/O pending? Not a clean exit!
51 XRPL_ASSERT(pending_.load() == 0, "xrpl::AsyncObject::~AsyncObject : nothing pending");
52 }
53
59 {
60 public:
61 explicit CompletionCounter(Derived* owner) : owner_(owner)
62 {
63 ++owner_->pending_;
64 }
65
67 {
68 ++owner_->pending_;
69 }
70
72 {
73 if (--owner_->pending_ == 0)
74 owner_->asyncHandlersComplete();
75 }
76
78 operator=(CompletionCounter const&) = delete;
79
80 private:
82 };
83
84 void
86 {
87 ++pending_;
88 }
89
90 void
92 {
93 if (--pending_ == 0)
94 (static_cast<Derived*>(this))->asyncHandlersComplete();
95 }
96
97private:
98 // The number of handlers pending.
100
101 friend Derived;
102};
103
104class ResolverAsioImpl : public ResolverAsio, public AsyncObject<ResolverAsioImpl>
105{
106public:
108
110
111 boost::asio::io_context& ioContext;
112 boost::asio::strand<boost::asio::io_context::executor_type> strand;
113 boost::asio::ip::tcp::resolver resolver;
114
118
121
122 // Represents a unit of work for the resolver to do
123 struct Work
124 {
127
128 template <class StringSequence>
129 Work(StringSequence const& inNames, HandlerType handler) : handler(std::move(handler))
130 {
131 names.reserve(inNames.size());
132
133 std::reverse_copy(inNames.begin(), inNames.end(), std::back_inserter(names));
134 }
135 };
136
138
142 , strand(boost::asio::make_strand(ioContext))
144 , stopCalled(false)
145 , stopped(true)
146 {
147 }
148
150 {
151 XRPL_ASSERT(work.empty(), "xrpl::ResolverAsioImpl::~ResolverAsioImpl : no pending work");
152 XRPL_ASSERT(stopped, "xrpl::ResolverAsioImpl::~ResolverAsioImpl : stopped");
153 }
154
155 //-------------------------------------------------------------------------
156 // AsyncObject
157 void
159 {
162 cv.notify_all();
163 }
164
165 //--------------------------------------------------------------------------
166 //
167 // Resolver
168 //
169 //--------------------------------------------------------------------------
170
171 void
172 start() override
173 {
174 XRPL_ASSERT(stopped == true, "xrpl::ResolverAsioImpl::start : stopped");
175 XRPL_ASSERT(stopCalled == false, "xrpl::ResolverAsioImpl::start : not stopping");
176
177 if (stopped.exchange(false))
178 {
179 {
180 std::scoped_lock const lk{mut};
182 }
183 addReference();
184 }
185 }
186
187 void
188 stopAsync() override
189 {
190 if (!stopCalled.exchange(true))
191 {
192 boost::asio::dispatch(
193 ioContext,
194 boost::asio::bind_executor(
195 strand, std::bind(&ResolverAsioImpl::doStop, this, CompletionCounter(this))));
196
197 JLOG(journal.debug()) << "Queued a stop request";
198 }
199 }
200
201 void
202 stop() override
203 {
204 stopAsync();
205
206 JLOG(journal.debug()) << "Waiting to stop";
208 cv.wait(lk, [this] { return asyncHandlersCompleted; });
209 lk.unlock();
210 JLOG(journal.debug()) << "Stopped";
211 }
212
213 void
214 resolve(std::vector<std::string> const& names, HandlerType const& handler) override
215 {
216 XRPL_ASSERT(stopCalled == false, "xrpl::ResolverAsioImpl::resolve : not stopping");
217 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::resolve : names non-empty");
218
219 // TODO NIKB use rvalue references to construct and move
220 // reducing cost.
221 boost::asio::dispatch(
222 ioContext,
223 boost::asio::bind_executor(
224 strand,
225 std::bind(
226 &ResolverAsioImpl::doResolve, this, names, handler, CompletionCounter(this))));
227 }
228
229 //-------------------------------------------------------------------------
230 // Resolver
231 void
232 doStop(CompletionCounter)
233 {
234 XRPL_ASSERT(stopCalled == true, "xrpl::ResolverAsioImpl::doStop : stopping");
235
236 if (!stopped.exchange(true))
237 {
238 work.clear();
239 resolver.cancel();
240
242 }
243 }
244
245 void
247 std::string name,
248 boost::system::error_code const& ec,
249 HandlerType handler,
250 boost::asio::ip::tcp::resolver::results_type results,
251 CompletionCounter)
252 {
253 if (ec == boost::asio::error::operation_aborted)
254 return;
255
257 auto iter = results.begin();
258
259 // If we get an error message back, we don't return any
260 // results that we may have gotten.
261 if (!ec)
262 {
263 while (iter != results.end())
264 {
266 ++iter;
267 }
268 }
269
270 handler(name, addresses);
271
272 boost::asio::post(
273 ioContext,
274 boost::asio::bind_executor(
275 strand, std::bind(&ResolverAsioImpl::doWork, this, CompletionCounter(this))));
276 }
277
278 static HostAndPort
280 {
281 // first attempt to parse as an endpoint (IP addr + port).
282 // If that doesn't succeed, fall back to generic name + port parsing
283
284 if (auto const result = beast::IP::Endpoint::fromStringChecked(str))
285 {
286 return make_pair(result->address().to_string(), std::to_string(result->port()));
287 }
288
289 // generic name/port parsing, which doesn't work for
290 // IPv6 addresses in particular because it considers a colon
291 // a port separator
292
293 // Attempt to find the first and last non-whitespace
294 auto const findWhitespace =
295 std::bind(&std::isspace<std::string::value_type>, std::placeholders::_1, std::locale());
296
297 auto hostFirst = std::ranges::find_if_not(str, findWhitespace);
298
299 auto portLast =
300 std::ranges::find_if_not(std::ranges::reverse_view(str), findWhitespace).base();
301
302 // This should only happen for all-whitespace strings
303 if (hostFirst >= portLast)
305
306 // Attempt to find the first and last valid port separators
307 auto const findPortSeparator = [](char const c) -> bool {
308 if (std::isspace(static_cast<unsigned char>(c)))
309 return true;
310
311 if (c == ':')
312 return true;
313
314 return false;
315 };
316
317 auto hostLast = std::find_if(hostFirst, portLast, findPortSeparator);
318
319 auto portFirst = std::find_if_not(hostLast, portLast, findPortSeparator);
320
321 return make_pair(std::string(hostFirst, hostLast), std::string(portFirst, portLast));
322 }
323
324 void
325 doWork(CompletionCounter)
326 {
327 if (stopCalled)
328 return;
329
330 // We don't have any work to do at this time
331 if (work.empty())
332 return;
333
334 std::string const name(work.front().names.back());
335 HandlerType const handler(work.front().handler);
336
337 work.front().names.pop_back();
338
339 if (work.front().names.empty())
340 work.pop_front();
341
342 auto const [host, port] = parseName(name);
343
344 if (host.empty())
345 {
346 JLOG(journal.error()) << "Unable to parse '" << name << "'";
347
348 boost::asio::post(
349 ioContext,
350 boost::asio::bind_executor(
351 strand, std::bind(&ResolverAsioImpl::doWork, this, CompletionCounter(this))));
352
353 return;
354 }
355
356 resolver.async_resolve(
357 host,
358 port,
359 std::bind(
361 this,
362 name,
363 std::placeholders::_1,
364 handler,
365 std::placeholders::_2,
366 CompletionCounter(this)));
367 }
368
369 void
370 doResolve(std::vector<std::string> const& names, HandlerType const& handler, CompletionCounter)
371 {
372 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::doResolve : names non-empty");
373
374 if (!stopCalled)
375 {
376 work.emplace_back(names, handler);
377
378 JLOG(journal.debug()) << "Queued new job with " << names.size() << " tasks. "
379 << work.size() << " jobs outstanding.";
380
381 if (!work.empty())
382 {
383 boost::asio::post(
384 ioContext,
385 boost::asio::bind_executor(
386 strand,
387 std::bind(&ResolverAsioImpl::doWork, this, CompletionCounter(this))));
388 }
389 }
390 }
391};
392
393//-----------------------------------------------------------------------------
394
396ResolverAsio::make(boost::asio::io_context& ioContext, beast::Journal journal)
397{
398 return std::make_unique<ResolverAsioImpl>(ioContext, journal);
399}
400
401//-----------------------------------------------------------------------------
402Resolver::~Resolver() = default;
403} // namespace xrpl
T back_inserter(T... args)
T begin(T... args)
T bind(T... args)
static std::optional< Endpoint > fromStringChecked(std::string const &s)
Create an Endpoint from a string.
A generic endpoint for log messages.
Definition Journal.h:38
RAII container that maintains the count of pending I/O.
CompletionCounter & operator=(CompletionCounter const &)=delete
CompletionCounter(CompletionCounter const &other)
std::atomic< int > pending_
void resolve(std::vector< std::string > const &names, HandlerType const &handler) override
void stopAsync() override
Issue an asynchronous stop request.
void start() override
Issue a synchronous start request.
ResolverAsioImpl(boost::asio::io_context &ioContext, beast::Journal journal)
std::condition_variable cv
std::atomic< bool > stopCalled
void doWork(CompletionCounter)
std::deque< Work > work
void stop() override
Issue a synchronous stop request.
static HostAndPort parseName(std::string const &str)
void doFinish(std::string name, boost::system::error_code const &ec, HandlerType handler, boost::asio::ip::tcp::resolver::results_type results, CompletionCounter)
boost::asio::ip::tcp::resolver resolver
boost::asio::strand< boost::asio::io_context::executor_type > strand
std::pair< std::string, std::string > HostAndPort
boost::asio::io_context & ioContext
std::atomic< bool > stopped
void doStop(CompletionCounter)
void doResolve(std::vector< std::string > const &names, HandlerType const &handler, CompletionCounter)
static std::unique_ptr< ResolverAsio > make(boost::asio::io_context &, beast::Journal)
ResolverAsio()=default
virtual ~Resolver()=0
std::function< void(std::string, std::vector< beast::IP::Endpoint >)> HandlerType
Definition Resolver.h:13
T empty(T... args)
T find_if_not(T... args)
T make_pair(T... args)
T make_unique(T... args)
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T push_back(T... args)
T reverse_copy(T... args)
T size(T... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
Work(StringSequence const &inNames, HandlerType handler)
std::vector< std::string > names
T to_string(T... args)
T unlock(T... args)