rippled
Loading...
Searching...
No Matches
ResolverAsio.cpp
1#include <xrpl/basics/Log.h>
2#include <xrpl/basics/Resolver.h>
3#include <xrpl/basics/ResolverAsio.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/net/IPEndpoint.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/instrumentation.h>
8
9#include <boost/asio/bind_executor.hpp>
10#include <boost/asio/error.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/system/detail/error_code.hpp>
14
15#include <algorithm>
16#include <atomic>
17#include <cctype>
18#include <condition_variable>
19#include <deque>
20#include <functional>
21#include <iterator>
22#include <locale>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <utility>
27#include <vector>
28
29namespace ripple {
30
35template <class Derived>
37{
38protected:
40 {
41 }
42
43public:
45 {
46 // Destroying the object with I/O pending? Not a clean exit!
47 XRPL_ASSERT(
48 m_pending.load() == 0,
49 "ripple::AsyncObject::~AsyncObject : nothing pending");
50 }
51
57 {
58 public:
59 explicit CompletionCounter(Derived* owner) : m_owner(owner)
60 {
61 ++m_owner->m_pending;
62 }
63
65 : m_owner(other.m_owner)
66 {
67 ++m_owner->m_pending;
68 }
69
71 {
72 if (--m_owner->m_pending == 0)
73 m_owner->asyncHandlersComplete();
74 }
75
77 operator=(CompletionCounter const&) = delete;
78
79 private:
80 Derived* m_owner;
81 };
82
83 void
85 {
86 ++m_pending;
87 }
88
89 void
91 {
92 if (--m_pending == 0)
93 (static_cast<Derived*>(this))->asyncHandlersComplete();
94 }
95
96private:
97 // The number of handlers pending.
99};
100
102 public AsyncObject<ResolverAsioImpl>
103{
104public:
106
108
109 boost::asio::io_context& m_io_context;
110 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
111 boost::asio::ip::tcp::resolver m_resolver;
112
116
119
120 // Represents a unit of work for the resolver to do
121 struct Work
122 {
125
126 template <class StringSequence>
127 Work(StringSequence const& names_, HandlerType const& handler_)
128 : handler(handler_)
129 {
130 names.reserve(names_.size());
131
133 names_.begin(), names_.end(), std::back_inserter(names));
134 }
135 };
136
138
140 boost::asio::io_context& io_context,
141 beast::Journal journal)
142 : m_journal(journal)
143 , m_io_context(io_context)
144 , m_strand(boost::asio::make_strand(io_context))
145 , m_resolver(io_context)
147 , m_stop_called(false)
148 , m_stopped(true)
149 {
150 }
151
153 {
154 XRPL_ASSERT(
155 m_work.empty(),
156 "ripple::ResolverAsioImpl::~ResolverAsioImpl : no pending work");
157 XRPL_ASSERT(
158 m_stopped, "ripple::ResolverAsioImpl::~ResolverAsioImpl : stopped");
159 }
160
161 //-------------------------------------------------------------------------
162 // AsyncObject
163 void
170
171 //--------------------------------------------------------------------------
172 //
173 // Resolver
174 //
175 //--------------------------------------------------------------------------
176
177 void
178 start() override
179 {
180 XRPL_ASSERT(
181 m_stopped == true, "ripple::ResolverAsioImpl::start : stopped");
182 XRPL_ASSERT(
183 m_stop_called == false,
184 "ripple::ResolverAsioImpl::start : not stopping");
185
186 if (m_stopped.exchange(false) == true)
187 {
188 {
191 }
192 addReference();
193 }
194 }
195
196 void
197 stop_async() override
198 {
199 if (m_stop_called.exchange(true) == false)
200 {
201 boost::asio::dispatch(
203 boost::asio::bind_executor(
204 m_strand,
205 std::bind(
207 this,
208 CompletionCounter(this))));
209
210 JLOG(m_journal.debug()) << "Queued a stop request";
211 }
212 }
213
214 void
215 stop() override
216 {
217 stop_async();
218
219 JLOG(m_journal.debug()) << "Waiting to stop";
221 m_cv.wait(lk, [this] { return m_asyncHandlersCompleted; });
222 lk.unlock();
223 JLOG(m_journal.debug()) << "Stopped";
224 }
225
226 void
227 resolve(std::vector<std::string> const& names, HandlerType const& handler)
228 override
229 {
230 XRPL_ASSERT(
231 m_stop_called == false,
232 "ripple::ResolverAsioImpl::resolve : not stopping");
233 XRPL_ASSERT(
234 !names.empty(),
235 "ripple::ResolverAsioImpl::resolve : names non-empty");
236
237 // TODO NIKB use rvalue references to construct and move
238 // reducing cost.
239 boost::asio::dispatch(
241 boost::asio::bind_executor(
242 m_strand,
243 std::bind(
245 this,
246 names,
247 handler,
248 CompletionCounter(this))));
249 }
250
251 //-------------------------------------------------------------------------
252 // Resolver
253 void
254 do_stop(CompletionCounter)
255 {
256 XRPL_ASSERT(
257 m_stop_called == true,
258 "ripple::ResolverAsioImpl::do_stop : stopping");
259
260 if (m_stopped.exchange(true) == false)
261 {
262 m_work.clear();
263 m_resolver.cancel();
264
266 }
267 }
268
269 void
271 std::string name,
272 boost::system::error_code const& ec,
273 HandlerType handler,
274 boost::asio::ip::tcp::resolver::results_type results,
275 CompletionCounter)
276 {
277 if (ec == boost::asio::error::operation_aborted)
278 return;
279
281 auto iter = results.begin();
282
283 // If we get an error message back, we don't return any
284 // results that we may have gotten.
285 if (!ec)
286 {
287 while (iter != results.end())
288 {
289 addresses.push_back(
291 ++iter;
292 }
293 }
294
295 handler(name, addresses);
296
297 boost::asio::post(
299 boost::asio::bind_executor(
300 m_strand,
301 std::bind(
303 this,
304 CompletionCounter(this))));
305 }
306
309 {
310 // first attempt to parse as an endpoint (IP addr + port).
311 // If that doesn't succeed, fall back to generic name + port parsing
312
313 if (auto const result = beast::IP::Endpoint::from_string_checked(str))
314 {
315 return make_pair(
316 result->address().to_string(), std::to_string(result->port()));
317 }
318
319 // generic name/port parsing, which doesn't work for
320 // IPv6 addresses in particular because it considers a colon
321 // a port separator
322
323 // Attempt to find the first and last non-whitespace
324 auto const find_whitespace = std::bind(
326 std::placeholders::_1,
327 std::locale());
328
329 auto host_first =
330 std::find_if_not(str.begin(), str.end(), find_whitespace);
331
332 auto port_last =
333 std::find_if_not(str.rbegin(), str.rend(), find_whitespace).base();
334
335 // This should only happen for all-whitespace strings
336 if (host_first >= port_last)
338
339 // Attempt to find the first and last valid port separators
340 auto const find_port_separator = [](char const c) -> bool {
341 if (std::isspace(static_cast<unsigned char>(c)))
342 return true;
343
344 if (c == ':')
345 return true;
346
347 return false;
348 };
349
350 auto host_last =
351 std::find_if(host_first, port_last, find_port_separator);
352
353 auto port_first =
354 std::find_if_not(host_last, port_last, find_port_separator);
355
356 return make_pair(
357 std::string(host_first, host_last),
358 std::string(port_first, port_last));
359 }
360
361 void
362 do_work(CompletionCounter)
363 {
364 if (m_stop_called == true)
365 return;
366
367 // We don't have any work to do at this time
368 if (m_work.empty())
369 return;
370
371 std::string const name(m_work.front().names.back());
372 HandlerType handler(m_work.front().handler);
373
374 m_work.front().names.pop_back();
375
376 if (m_work.front().names.empty())
377 m_work.pop_front();
378
379 auto const [host, port] = parseName(name);
380
381 if (host.empty())
382 {
383 JLOG(m_journal.error()) << "Unable to parse '" << name << "'";
384
385 boost::asio::post(
387 boost::asio::bind_executor(
388 m_strand,
389 std::bind(
391 this,
392 CompletionCounter(this))));
393
394 return;
395 }
396
397 m_resolver.async_resolve(
398 host,
399 port,
400 std::bind(
402 this,
403 name,
404 std::placeholders::_1,
405 handler,
406 std::placeholders::_2,
407 CompletionCounter(this)));
408 }
409
410 void
412 std::vector<std::string> const& names,
413 HandlerType const& handler,
414 CompletionCounter)
415 {
416 XRPL_ASSERT(
417 !names.empty(),
418 "ripple::ResolverAsioImpl::do_resolve : names non-empty");
419
420 if (m_stop_called == false)
421 {
422 m_work.emplace_back(names, handler);
423
424 JLOG(m_journal.debug())
425 << "Queued new job with " << names.size() << " tasks. "
426 << m_work.size() << " jobs outstanding.";
427
428 if (m_work.size() > 0)
429 {
430 boost::asio::post(
432 boost::asio::bind_executor(
433 m_strand,
434 std::bind(
436 this,
437 CompletionCounter(this))));
438 }
439 }
440 }
441};
442
443//-----------------------------------------------------------------------------
444
446ResolverAsio::New(boost::asio::io_context& io_context, beast::Journal journal)
447{
448 return std::make_unique<ResolverAsioImpl>(io_context, journal);
449}
450
451//-----------------------------------------------------------------------------
452Resolver::~Resolver() = default;
453} // namespace ripple
T back_inserter(T... args)
T begin(T... args)
T bind(T... args)
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
RAII container that maintains the count of pending I/O.
CompletionCounter(CompletionCounter const &other)
CompletionCounter & operator=(CompletionCounter const &)=delete
Mix-in to track when all pending I/O is complete.
std::atomic< int > m_pending
std::atomic< bool > m_stopped
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_work(CompletionCounter)
void resolve(std::vector< std::string > const &names, HandlerType const &handler) override
void stop() override
Issue a synchronous stop request.
std::condition_variable m_cv
boost::asio::ip::tcp::resolver m_resolver
ResolverAsioImpl(boost::asio::io_context &io_context, beast::Journal journal)
std::pair< std::string, std::string > HostAndPort
void do_stop(CompletionCounter)
std::deque< Work > m_work
void start() override
Issue a synchronous start request.
std::atomic< bool > m_stop_called
void stop_async() override
Issue an asynchronous stop request.
HostAndPort parseName(std::string const &str)
boost::asio::io_context & m_io_context
void do_finish(std::string name, boost::system::error_code const &ec, HandlerType handler, boost::asio::ip::tcp::resolver::results_type results, CompletionCounter)
void do_resolve(std::vector< std::string > const &names, HandlerType const &handler, CompletionCounter)
static std::unique_ptr< ResolverAsio > New(boost::asio::io_context &, beast::Journal)
virtual ~Resolver()=0
T empty(T... args)
T end(T... args)
T exchange(T... args)
T find_if_not(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T push_back(T... args)
T rbegin(T... args)
T rend(T... args)
T reserve(T... args)
T reverse_copy(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
std::vector< std::string > names
Work(StringSequence const &names_, HandlerType const &handler_)
T to_string(T... args)