rippled
Loading...
Searching...
No Matches
ResolverAsio.cpp
1#include <xrpl/basics/Log.h>
2#include <xrpl/basics/Resolver.h>
3#include <xrpl/basics/ResolverAsio.h>
4#include <xrpl/beast/net/IPAddressConversion.h>
5#include <xrpl/beast/net/IPEndpoint.h>
6#include <xrpl/beast/utility/Journal.h>
7#include <xrpl/beast/utility/instrumentation.h>
8
9#include <boost/asio/bind_executor.hpp>
10#include <boost/asio/error.hpp>
11#include <boost/asio/io_context.hpp>
12#include <boost/asio/ip/tcp.hpp>
13#include <boost/system/detail/error_code.hpp>
14
15#include <algorithm>
16#include <atomic>
17#include <cctype>
18#include <condition_variable>
19#include <deque>
20#include <functional>
21#include <iterator>
22#include <locale>
23#include <memory>
24#include <mutex>
25#include <string>
26#include <utility>
27#include <vector>
28
29namespace xrpl {
30
35template <class Derived>
37{
39 {
40 }
41
42public:
44 {
45 // Destroying the object with I/O pending? Not a clean exit!
46 XRPL_ASSERT(m_pending.load() == 0, "xrpl::AsyncObject::~AsyncObject : nothing pending");
47 }
48
54 {
55 public:
56 explicit CompletionCounter(Derived* owner) : m_owner(owner)
57 {
58 ++m_owner->m_pending;
59 }
60
62 {
63 ++m_owner->m_pending;
64 }
65
67 {
68 if (--m_owner->m_pending == 0)
69 m_owner->asyncHandlersComplete();
70 }
71
73 operator=(CompletionCounter const&) = delete;
74
75 private:
77 };
78
79 void
81 {
82 ++m_pending;
83 }
84
85 void
87 {
88 if (--m_pending == 0)
89 (static_cast<Derived*>(this))->asyncHandlersComplete();
90 }
91
92private:
93 // The number of handlers pending.
95
96 friend Derived;
97};
98
99class ResolverAsioImpl : public ResolverAsio, public AsyncObject<ResolverAsioImpl>
100{
101public:
103
105
106 boost::asio::io_context& m_io_context;
107 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
108 boost::asio::ip::tcp::resolver m_resolver;
109
113
116
117 // Represents a unit of work for the resolver to do
118 struct Work
119 {
122
123 template <class StringSequence>
124 Work(StringSequence const& names_, HandlerType const& handler_) : handler(handler_)
125 {
126 names.reserve(names_.size());
127
128 std::reverse_copy(names_.begin(), names_.end(), std::back_inserter(names));
129 }
130 };
131
133
134 ResolverAsioImpl(boost::asio::io_context& io_context, beast::Journal journal)
135 : m_journal(journal)
136 , m_io_context(io_context)
137 , m_strand(boost::asio::make_strand(io_context))
138 , m_resolver(io_context)
139 , m_stop_called(false)
140 , m_stopped(true)
141 {
142 }
143
145 {
146 XRPL_ASSERT(m_work.empty(), "xrpl::ResolverAsioImpl::~ResolverAsioImpl : no pending work");
147 XRPL_ASSERT(m_stopped, "xrpl::ResolverAsioImpl::~ResolverAsioImpl : stopped");
148 }
149
150 //-------------------------------------------------------------------------
151 // AsyncObject
152 void
159
160 //--------------------------------------------------------------------------
161 //
162 // Resolver
163 //
164 //--------------------------------------------------------------------------
165
166 void
167 start() override
168 {
169 XRPL_ASSERT(m_stopped == true, "xrpl::ResolverAsioImpl::start : stopped");
170 XRPL_ASSERT(m_stop_called == false, "xrpl::ResolverAsioImpl::start : not stopping");
171
172 if (m_stopped.exchange(false))
173 {
174 {
175 std::lock_guard const lk{m_mut};
177 }
178 addReference();
179 }
180 }
181
182 void
183 stop_async() override
184 {
185 if (!m_stop_called.exchange(true))
186 {
187 boost::asio::dispatch(
189 boost::asio::bind_executor(
190 m_strand,
191 std::bind(&ResolverAsioImpl::do_stop, this, CompletionCounter(this))));
192
193 JLOG(m_journal.debug()) << "Queued a stop request";
194 }
195 }
196
197 void
198 stop() override
199 {
200 stop_async();
201
202 JLOG(m_journal.debug()) << "Waiting to stop";
204 m_cv.wait(lk, [this] { return m_asyncHandlersCompleted; });
205 lk.unlock();
206 JLOG(m_journal.debug()) << "Stopped";
207 }
208
209 void
210 resolve(std::vector<std::string> const& names, HandlerType const& handler) override
211 {
212 XRPL_ASSERT(m_stop_called == false, "xrpl::ResolverAsioImpl::resolve : not stopping");
213 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::resolve : names non-empty");
214
215 // TODO NIKB use rvalue references to construct and move
216 // reducing cost.
217 boost::asio::dispatch(
219 boost::asio::bind_executor(
220 m_strand,
221 std::bind(
222 &ResolverAsioImpl::do_resolve, this, names, handler, CompletionCounter(this))));
223 }
224
225 //-------------------------------------------------------------------------
226 // Resolver
227 void
228 do_stop(CompletionCounter)
229 {
230 XRPL_ASSERT(m_stop_called == true, "xrpl::ResolverAsioImpl::do_stop : stopping");
231
232 if (!m_stopped.exchange(true))
233 {
234 m_work.clear();
235 m_resolver.cancel();
236
238 }
239 }
240
241 void
243 std::string name,
244 boost::system::error_code const& ec,
245 HandlerType handler,
246 boost::asio::ip::tcp::resolver::results_type results,
247 CompletionCounter)
248 {
249 if (ec == boost::asio::error::operation_aborted)
250 return;
251
253 auto iter = results.begin();
254
255 // If we get an error message back, we don't return any
256 // results that we may have gotten.
257 if (!ec)
258 {
259 while (iter != results.end())
260 {
262 ++iter;
263 }
264 }
265
266 handler(name, addresses);
267
268 boost::asio::post(
270 boost::asio::bind_executor(
271 m_strand, std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
272 }
273
274 static HostAndPort
276 {
277 // first attempt to parse as an endpoint (IP addr + port).
278 // If that doesn't succeed, fall back to generic name + port parsing
279
280 if (auto const result = beast::IP::Endpoint::from_string_checked(str))
281 {
282 return make_pair(result->address().to_string(), std::to_string(result->port()));
283 }
284
285 // generic name/port parsing, which doesn't work for
286 // IPv6 addresses in particular because it considers a colon
287 // a port separator
288
289 // Attempt to find the first and last non-whitespace
290 auto const find_whitespace =
292
293 auto host_first = std::find_if_not(str.begin(), str.end(), find_whitespace);
294
295 auto port_last = std::find_if_not(str.rbegin(), str.rend(), find_whitespace).base();
296
297 // This should only happen for all-whitespace strings
298 if (host_first >= port_last)
300
301 // Attempt to find the first and last valid port separators
302 auto const find_port_separator = [](char const c) -> bool {
303 if (std::isspace(static_cast<unsigned char>(c)))
304 return true;
305
306 if (c == ':')
307 return true;
308
309 return false;
310 };
311
312 auto host_last = std::find_if(host_first, port_last, find_port_separator);
313
314 auto port_first = std::find_if_not(host_last, port_last, find_port_separator);
315
316 return make_pair(std::string(host_first, host_last), std::string(port_first, port_last));
317 }
318
319 void
320 do_work(CompletionCounter)
321 {
322 if (m_stop_called)
323 return;
324
325 // We don't have any work to do at this time
326 if (m_work.empty())
327 return;
328
329 std::string const name(m_work.front().names.back());
330 HandlerType const handler(m_work.front().handler);
331
332 m_work.front().names.pop_back();
333
334 if (m_work.front().names.empty())
335 m_work.pop_front();
336
337 auto const [host, port] = parseName(name);
338
339 if (host.empty())
340 {
341 JLOG(m_journal.error()) << "Unable to parse '" << name << "'";
342
343 boost::asio::post(
345 boost::asio::bind_executor(
346 m_strand,
347 std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
348
349 return;
350 }
351
352 m_resolver.async_resolve(
353 host,
354 port,
355 std::bind(
357 this,
358 name,
359 std::placeholders::_1,
360 handler,
361 std::placeholders::_2,
362 CompletionCounter(this)));
363 }
364
365 void
366 do_resolve(std::vector<std::string> const& names, HandlerType const& handler, CompletionCounter)
367 {
368 XRPL_ASSERT(!names.empty(), "xrpl::ResolverAsioImpl::do_resolve : names non-empty");
369
370 if (!m_stop_called)
371 {
372 m_work.emplace_back(names, handler);
373
374 JLOG(m_journal.debug()) << "Queued new job with " << names.size() << " tasks. "
375 << m_work.size() << " jobs outstanding.";
376
377 if (!m_work.empty())
378 {
379 boost::asio::post(
381 boost::asio::bind_executor(
382 m_strand,
383 std::bind(&ResolverAsioImpl::do_work, this, CompletionCounter(this))));
384 }
385 }
386 }
387};
388
389//-----------------------------------------------------------------------------
390
392ResolverAsio::New(boost::asio::io_context& io_context, beast::Journal journal)
393{
394 return std::make_unique<ResolverAsioImpl>(io_context, journal);
395}
396
397//-----------------------------------------------------------------------------
398Resolver::~Resolver() = default;
399} // namespace xrpl
T back_inserter(T... args)
T begin(T... args)
T bind(T... args)
static std::optional< Endpoint > from_string_checked(std::string const &s)
Create an Endpoint from a string.
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
RAII container that maintains the count of pending I/O.
CompletionCounter & operator=(CompletionCounter const &)=delete
CompletionCounter(CompletionCounter const &other)
Mix-in to track when all pending I/O is complete.
std::atomic< int > m_pending
std::atomic< bool > m_stop_called
void do_stop(CompletionCounter)
boost::asio::io_context & m_io_context
boost::asio::ip::tcp::resolver m_resolver
ResolverAsioImpl(boost::asio::io_context &io_context, beast::Journal journal)
void resolve(std::vector< std::string > const &names, HandlerType const &handler) override
void start() override
Issue a synchronous start request.
void do_finish(std::string name, boost::system::error_code const &ec, HandlerType handler, boost::asio::ip::tcp::resolver::results_type results, CompletionCounter)
std::deque< Work > m_work
void stop() override
Issue a synchronous stop request.
std::condition_variable m_cv
std::atomic< bool > m_stopped
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_resolve(std::vector< std::string > const &names, HandlerType const &handler, CompletionCounter)
static HostAndPort parseName(std::string const &str)
std::pair< std::string, std::string > HostAndPort
void stop_async() override
Issue an asynchronous stop request.
void do_work(CompletionCounter)
static std::unique_ptr< ResolverAsio > New(boost::asio::io_context &, beast::Journal)
virtual ~Resolver()=0
T empty(T... args)
T end(T... args)
T exchange(T... args)
T find_if_not(T... args)
T is_same_v
T load(T... args)
T make_pair(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T push_back(T... args)
T rbegin(T... args)
T rend(T... args)
T reserve(T... args)
T reverse_copy(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
std::vector< std::string > names
Work(StringSequence const &names_, HandlerType const &handler_)
T to_string(T... args)