xrpld
Loading...
Searching...
No Matches
ConnectAttempt.cpp
1#include <xrpld/overlay/detail/ConnectAttempt.h>
2
3#include <xrpld/app/main/Application.h>
4#include <xrpld/overlay/Cluster.h>
5#include <xrpld/overlay/Peer.h>
6#include <xrpld/overlay/detail/Handshake.h>
7#include <xrpld/overlay/detail/OverlayImpl.h>
8#include <xrpld/overlay/detail/PeerImp.h>
9#include <xrpld/overlay/detail/ProtocolVersion.h>
10#include <xrpld/peerfinder/PeerfinderManager.h>
11#include <xrpld/peerfinder/Slot.h>
12
13#include <xrpl/basics/Log.h>
14#include <xrpl/beast/net/IPAddressConversion.h>
15#include <xrpl/beast/utility/Journal.h>
16#include <xrpl/beast/utility/instrumentation.h>
17#include <xrpl/json/json_reader.h>
18#include <xrpl/json/json_value.h>
19#include <xrpl/protocol/PublicKey.h>
20#include <xrpl/protocol/tokens.h>
21#include <xrpl/resource/Consumer.h>
22
23#include <boost/asio/bind_executor.hpp>
24#include <boost/asio/buffer.hpp>
25#include <boost/asio/error.hpp>
26#include <boost/asio/io_context.hpp>
27#include <boost/asio/ip/tcp.hpp>
28#include <boost/asio/post.hpp>
29#include <boost/asio/ssl/stream_base.hpp>
30#include <boost/asio/ssl/verify_mode.hpp>
31#include <boost/asio/strand.hpp>
32#include <boost/beast/http/impl/read.hpp>
33#include <boost/beast/http/impl/write.hpp>
34#include <boost/beast/http/status.hpp>
35#include <boost/system/system_error.hpp>
36
37#include <chrono>
38#include <exception>
39#include <functional>
40#include <memory>
41#include <optional>
42#include <string>
43#include <utility>
44#include <vector>
45
46namespace xrpl {
47
49 Application& app,
50 boost::asio::io_context& ioContext,
51 endpoint_type remoteEndpoint,
53 shared_context const& context,
54 Peer::id_t id,
56 beast::Journal journal,
57 OverlayImpl& overlay)
58 : Child(overlay)
59 , app_(app)
60 , id_(id)
61 , sink_(journal, OverlayImpl::makePrefix(id))
63 , remoteEndpoint_(std::move(remoteEndpoint))
64 , usage_(usage)
65 , strand_(boost::asio::make_strand(ioContext))
66 , timer_(ioContext)
67 , streamPtr_(
68 std::make_unique<stream_type>(
69 socket_type(std::forward<boost::asio::io_context&>(ioContext)),
70 *context))
71 , socket_(streamPtr_->next_layer().socket())
73 , slot_(slot)
74{
75}
76
78{
79 if (slot_ != nullptr)
80 overlay_.peerFinder().onClosed(slot_);
81 JLOG(journal_.trace()) << "~ConnectAttempt";
82}
83
84void
86{
87 if (!strand_.running_in_this_thread())
88 {
90 return;
91 }
92 if (socket_.is_open())
93 {
94 JLOG(journal_.debug()) << "Stop";
95 }
96 close();
97}
98
99void
101{
102 setTimer();
103
104 stream_.next_layer().async_connect(
106 boost::asio::bind_executor(
107 strand_,
108 std::bind(&ConnectAttempt::onConnect, shared_from_this(), std::placeholders::_1)));
109}
110
111//------------------------------------------------------------------------------
112
113void
115{
116 XRPL_ASSERT(
117 strand_.running_in_this_thread(), "xrpl::ConnectAttempt::close : strand in this thread");
118 if (!socket_.is_open())
119 return;
120
121 try
122 {
123 timer_.cancel();
124 socket_.close();
125 }
126 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
127 {
128 // ignored
129 }
130
131 JLOG(journal_.debug()) << "Closed";
132}
133
134void
136{
137 JLOG(journal_.debug()) << reason;
138 close();
139}
140
141void
143{
144 JLOG(journal_.debug()) << name << ": " << ec.message();
145 close();
146}
147
148void
150{
151 try
152 {
153 timer_.expires_after(std::chrono::seconds(15));
154 }
155 catch (boost::system::system_error const& e)
156 {
157 JLOG(journal_.error()) << "setTimer: " << e.code();
158 return;
159 }
160
161 timer_.async_wait(
162 boost::asio::bind_executor(
163 strand_,
164 std::bind(&ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
165}
166
167void
169{
170 try
171 {
172 timer_.cancel();
173 }
174 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
175 {
176 // ignored
177 }
178}
179
180void
182{
183 if (!socket_.is_open())
184 return;
185
186 if (ec)
187 {
188 // do not initiate shutdown, timers are frequently cancelled
189 if (ec == boost::asio::error::operation_aborted)
190 return;
191
192 // This should never happen
193 JLOG(journal_.error()) << "onTimer: " << ec.message();
194 close();
195 return;
196 }
197 fail("Timeout");
198}
199
200void
202{
203 cancelTimer();
204
205 if (ec)
206 {
207 if (ec == boost::asio::error::operation_aborted)
208 return;
209
210 fail("onConnect", ec);
211 return;
212 }
213
214 if (!socket_.is_open())
215 return;
216
217 // check if connection has really been established
218 socket_.local_endpoint(ec);
219 if (ec)
220 {
221 fail("onConnect", ec);
222 return;
223 }
224
225 setTimer();
226
227 stream_.set_verify_mode(boost::asio::ssl::verify_none);
228 stream_.async_handshake(
229 boost::asio::ssl::stream_base::client,
230 boost::asio::bind_executor(
231 strand_,
232 std::bind(&ConnectAttempt::onHandshake, shared_from_this(), std::placeholders::_1)));
233}
234
235void
237{
238 cancelTimer();
239 if (!socket_.is_open())
240 return;
241
242 if (ec)
243 {
244 if (ec == boost::asio::error::operation_aborted)
245 return;
246
247 fail("onHandshake", ec);
248 return;
249 }
250
251 auto const localEndpoint = socket_.local_endpoint(ec);
252 if (ec)
253 {
254 fail("onHandshake", ec);
255 return;
256 }
257
258 if (!overlay_.peerFinder().onConnected(
260 {
261 fail("Duplicate connection");
262 return;
263 }
264
265 auto const sharedValue = makeSharedValue(*streamPtr_, journal_);
266 if (!sharedValue)
267 {
268 close(); // makeSharedValue logs
269 return;
270 }
271
273 !overlay_.peerFinder().config().peerPrivate,
274 app_.config().compression,
275 app_.config().ledgerReplay,
276 app_.config().txReduceRelayEnable,
277 app_.config().vpReduceRelayBaseSquelchEnable);
278
280 req_,
281 *sharedValue,
282 overlay_.setup().networkID,
283 overlay_.setup().publicIp,
284 remoteEndpoint_.address(),
285 app_);
286
287 setTimer();
288 boost::beast::http::async_write(
289 stream_,
290 req_,
291 boost::asio::bind_executor(
292 strand_,
293 std::bind(&ConnectAttempt::onWrite, shared_from_this(), std::placeholders::_1)));
294}
295
296void
298{
299 cancelTimer();
300
301 if (!socket_.is_open())
302 return;
303
304 if (ec)
305 {
306 if (ec == boost::asio::error::operation_aborted)
307 return;
308
309 fail("onWrite", ec);
310 return;
311 }
312
313 boost::beast::http::async_read(
314 stream_,
315 readBuf_,
316 response_,
317 boost::asio::bind_executor(
318 strand_,
319 std::bind(&ConnectAttempt::onRead, shared_from_this(), std::placeholders::_1)));
320}
321
322void
324{
325 cancelTimer();
326
327 if (!socket_.is_open())
328 return;
329
330 if (ec)
331 {
332 if (ec == boost::asio::error::operation_aborted)
333 return;
334
335 if (ec == boost::asio::error::eof)
336 {
337 JLOG(journal_.debug()) << "EOF";
338 setTimer();
339 stream_.async_shutdown(
340 boost::asio::bind_executor(
341 strand_,
342 std::bind(
343 &ConnectAttempt::onShutdown, shared_from_this(), std::placeholders::_1)));
344 return;
345 }
346
347 fail("onRead", ec);
348 return;
349 }
350
352}
353
354void
356{
357 cancelTimer();
358 if (!ec)
359 {
360 close();
361 return;
362 }
363
364 if (ec != boost::asio::error::eof)
365 {
366 fail("onShutdown", ec);
367 return;
368 }
369 close();
370}
371
372//--------------------------------------------------------------------------
373
374void
376{
377 if (response_.result() == boost::beast::http::status::service_unavailable)
378 {
380 json::Reader r;
381 std::string s;
382 s.reserve(boost::asio::buffer_size(response_.body().data()));
383 for (auto const buffer : response_.body().data())
384 {
385 s.append(static_cast<char const*>(buffer.data()), boost::asio::buffer_size(buffer));
386 }
387 auto const success = r.parse(s, json);
388 if (success)
389 {
390 if (json.isObject() && json.isMember("peer-ips"))
391 {
392 json::Value const& ips = json["peer-ips"];
393 if (ips.isArray())
394 {
396 eps.reserve(ips.size());
397 for (auto const& v : ips)
398 {
399 if (v.isString())
400 {
401 error_code ec;
402 auto const ep = parseEndpoint(v.asString(), ec);
403 if (!ec)
404 eps.push_back(ep);
405 }
406 }
407 overlay_.peerFinder().onRedirects(remoteEndpoint_, eps);
408 }
409 }
410 }
411 }
412
414 {
415 JLOG(journal_.info()) << "Unable to upgrade to peer protocol: " << response_.result()
416 << " (" << response_.reason() << ")";
417 close();
418 return;
419 }
420
421 // Just because our peer selected a particular protocol version doesn't
422 // mean that it's acceptable to us. Check that it is:
423 std::optional<ProtocolVersion> negotiatedProtocol;
424
425 {
426 auto const pvs = parseProtocolVersions(response_["Upgrade"]);
427
428 if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
429 negotiatedProtocol = pvs[0];
430
431 if (!negotiatedProtocol)
432 {
433 fail("processResponse: Unable to negotiate protocol version");
434 return;
435 }
436 }
437
438 auto const sharedValue = makeSharedValue(*streamPtr_, journal_);
439 if (!sharedValue)
440 {
441 close(); // makeSharedValue logs
442 return;
443 }
444
445 try
446 {
447 auto const publicKey = verifyHandshake(
448 response_,
449 *sharedValue,
450 overlay_.setup().networkID,
451 overlay_.setup().publicIp,
452 remoteEndpoint_.address(),
453 app_);
454
455 usage_.setPublicKey(publicKey);
456
457 JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
458
459 JLOG(journal_.debug()) << "Protocol: " << to_string(*negotiatedProtocol);
460
461 auto const member = app_.getCluster().member(publicKey);
462 if (member)
463 {
464 JLOG(journal_.info()) << "Cluster name: " << *member;
465 }
466
467 auto const result =
468 overlay_.peerFinder().activate(slot_, publicKey, static_cast<bool>(member));
469 if (result != PeerFinder::Result::Success)
470 {
471 fail("Outbound " + std::string(to_string(result)));
472 return;
473 }
474
475 auto const peer = std::make_shared<PeerImp>(
476 app_,
477 std::move(streamPtr_),
478 readBuf_.data(),
479 std::move(slot_),
480 std::move(response_),
481 usage_,
482 publicKey,
483 *negotiatedProtocol,
484 id_,
485 overlay_);
486
487 overlay_.addActive(peer);
488 }
489 catch (std::exception const& e)
490 {
491 fail(std::string("Handshake failure (") + e.what() + ")");
492 return;
493 }
494}
495
496} // namespace xrpl
T append(T... args)
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
Unserialize a JSON document into a Value.
Definition json_reader.h:17
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
bool isArray() const
UInt size() const
Number of values in array or object.
boost::system::error_code error_code
boost::beast::ssl_stream< middle_type > stream_type
void fail(std::string const &reason)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
socket_type & socket_
boost::beast::multi_buffer readBuf_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
static boost::asio::ip::tcp::endpoint parseEndpoint(std::string const &s, boost::system::error_code &ec)
std::shared_ptr< boost::asio::ssl::context > shared_context
void onRead(error_code ec)
endpoint_type remoteEndpoint_
void onConnect(error_code ec)
ConnectAttempt(Application &app, boost::asio::io_context &ioContext, endpoint_type remoteEndpoint, Resource::Consumer usage, shared_context const &context, Peer::id_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
stream_type & stream_
void onShutdown(error_code ec)
std::shared_ptr< PeerFinder::Slot > slot_
response_type response_
boost::asio::ip::tcp::socket socket_type
std::uint32_t const id_
std::unique_ptr< stream_type > streamPtr_
boost::asio::ip::tcp::endpoint endpoint_type
void onHandshake(error_code ec)
Resource::Consumer usage_
beast::WrappedSink sink_
beast::Journal const journal_
void onWrite(error_code ec)
void onTimer(error_code ec)
Child(OverlayImpl &overlay)
static bool isPeerUpgrade(http_request_type const &request)
std::uint32_t id_t
Uniquely identifies a peer.
An endpoint that consumes resources.
Definition Consumer.h:15
T make_shared(T... args)
JSON (JavaScript Object Notation).
Definition json_errors.h:5
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address publicIp, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:93
std::string to_string(BaseUInt< Bits, Tag > const &a)
Definition base_uint.h:633
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
void buildHandshake(boost::beast::http::fields &h, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address publicIp, beast::IP::Address remoteIp, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
T push_back(T... args)
T reserve(T... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
T what(T... args)