rippled
Loading...
Searching...
No Matches
ConnectAttempt.cpp
1#include <xrpld/overlay/Cluster.h>
2#include <xrpld/overlay/detail/ConnectAttempt.h>
3#include <xrpld/overlay/detail/PeerImp.h>
4#include <xrpld/overlay/detail/ProtocolVersion.h>
5
6#include <xrpl/json/json_reader.h>
7
8#include <sstream>
9
10namespace xrpl {
11
13 Application& app,
14 boost::asio::io_context& io_context,
15 endpoint_type const& remote_endpoint,
17 shared_context const& context,
20 beast::Journal journal,
21 OverlayImpl& overlay)
22 : Child(overlay)
23 , app_(app)
24 , id_(id)
25 , sink_(journal, OverlayImpl::makePrefix(id))
26 , journal_(sink_)
27 , remote_endpoint_(remote_endpoint)
28 , usage_(usage)
29 , strand_(boost::asio::make_strand(io_context))
30 , timer_(io_context)
31 , stepTimer_(io_context)
32 , stream_ptr_(
33 std::make_unique<stream_type>(
34 socket_type(std::forward<boost::asio::io_context&>(io_context)),
35 *context))
36 , socket_(stream_ptr_->next_layer().socket())
37 , stream_(*stream_ptr_)
38 , slot_(slot)
39{
40}
41
43{
44 // slot_ will be null if we successfully connected
45 // and transferred ownership to a PeerImp
46 if (slot_ != nullptr)
48}
49
50void
52{
53 if (!strand_.running_in_this_thread())
54 {
56 return;
57 }
58
59 if (!socket_.is_open())
60 return;
61
62 JLOG(journal_.debug()) << "stop: Stop";
63
64 shutdown();
65}
66
67void
69{
70 if (!strand_.running_in_this_thread())
71 {
73 return;
74 }
75
76 JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_;
77
78 ioPending_ = true;
79
80 // Allow up to connectTimeout_ seconds to establish remote peer connection
82
83 stream_.next_layer().async_connect(
85 boost::asio::bind_executor(
86 strand_,
87 std::bind(&ConnectAttempt::onConnect, shared_from_this(), std::placeholders::_1)));
88}
89
90//------------------------------------------------------------------------------
91
92void
94{
95 XRPL_ASSERT(
96 strand_.running_in_this_thread(), "xrpl::ConnectAttempt::shutdown: strand in this thread");
97
98 if (!socket_.is_open())
99 return;
100
101 shutdown_ = true;
102 boost::beast::get_lowest_layer(stream_).cancel();
103
105}
106
107void
109{
110 XRPL_ASSERT(
111 strand_.running_in_this_thread(),
112 "xrpl::ConnectAttempt::tryAsyncShutdown : strand in this thread");
113
115 return;
116
117 if (ioPending_)
118 return;
119
120 // gracefully shutdown the SSL socket, performing a shutdown handshake
122 {
124 stream_.async_shutdown(bind_executor(
125 strand_,
126 std::bind(&ConnectAttempt::onShutdown, shared_from_this(), std::placeholders::_1)));
127 return;
128 }
129
130 close();
131}
132
133void
135{
136 cancelTimer();
137
138 if (ec)
139 {
140 // - eof: the stream was cleanly closed
141 // - operation_aborted: an expired timer (slow shutdown)
142 // - stream_truncated: the tcp connection closed (no handshake) it could
143 // occur if a peer does not perform a graceful disconnect
144 // - broken_pipe: the peer is gone
145 // - application data after close notify: benign SSL shutdown condition
146 bool const shouldLog =
147 (ec != boost::asio::error::eof && ec != boost::asio::error::operation_aborted &&
148 ec.message().find("application data after close notify") == std::string::npos);
149
150 if (shouldLog)
151 {
152 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
153 }
154 }
155
156 close();
157}
158
159void
161{
162 XRPL_ASSERT(
163 strand_.running_in_this_thread(), "xrpl::ConnectAttempt::close : strand in this thread");
164 if (!socket_.is_open())
165 return;
166
167 cancelTimer();
168
169 error_code ec;
170 socket_.close(ec); // NOLINT(bugprone-unused-return-value)
171}
172
173void
175{
176 JLOG(journal_.debug()) << reason;
177 shutdown();
178}
179
180void
182{
183 JLOG(journal_.debug()) << name << ": " << ec.message();
184 shutdown();
185}
186
187void
189{
190 currentStep_ = step;
191
192 // Set global timer (only if not already set)
193 if (timer_.expiry() == std::chrono::steady_clock::time_point{})
194 {
195 try
196 {
197 timer_.expires_after(connectTimeout);
198 timer_.async_wait(
199 boost::asio::bind_executor(
200 strand_,
201 std::bind(
202 &ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
203 }
204 catch (std::exception const& ex)
205 {
206 JLOG(journal_.error()) << "setTimer (global): " << ex.what();
207 close();
208 return;
209 }
210 }
211
212 // Set step-specific timer
213 try
214 {
215 std::chrono::seconds stepTimeout;
216 switch (step)
217 {
219 stepTimeout = StepTimeouts::tcpConnect;
220 break;
222 stepTimeout = StepTimeouts::tlsHandshake;
223 break;
225 stepTimeout = StepTimeouts::httpWrite;
226 break;
228 stepTimeout = StepTimeouts::httpRead;
229 break;
231 stepTimeout = StepTimeouts::tlsShutdown;
232 break;
235 return; // No timer needed for init or complete step
236 }
237
238 // call to expires_after cancels previous timer
239 stepTimer_.expires_after(stepTimeout);
240 stepTimer_.async_wait(
241 boost::asio::bind_executor(
242 strand_,
243 std::bind(&ConnectAttempt::onTimer, shared_from_this(), std::placeholders::_1)));
244
245 JLOG(journal_.trace()) << "setTimer: " << stepToString(step)
246 << " timeout=" << stepTimeout.count() << "s";
247 }
248 catch (std::exception const& ex)
249 {
250 JLOG(journal_.error()) << "setTimer (step " << stepToString(step) << "): " << ex.what();
251 close();
252 return;
253 }
254}
255
256void
258{
259 try
260 {
261 timer_.cancel();
262 stepTimer_.cancel();
263 }
264 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
265 {
266 // ignored
267 }
268}
269
270void
272{
273 if (!socket_.is_open())
274 return;
275
276 if (ec)
277 {
278 // do not initiate shutdown, timers are frequently cancelled
279 if (ec == boost::asio::error::operation_aborted)
280 return;
281
282 // This should never happen
283 JLOG(journal_.error()) << "onTimer: " << ec.message();
284 close();
285 return;
286 }
287
288 // Determine which timer expired by checking their expiry times
289 auto const now = std::chrono::steady_clock::now();
290 bool const globalExpired = (timer_.expiry() <= now);
291 bool const stepExpired = (stepTimer_.expiry() <= now);
292
293 if (globalExpired)
294 {
295 JLOG(journal_.debug()) << "onTimer: Global timeout; step: " << stepToString(currentStep_);
296 }
297 else if (stepExpired)
298 {
299 JLOG(journal_.debug()) << "onTimer: Step timeout; step: " << stepToString(currentStep_);
300 }
301 else
302 {
303 JLOG(journal_.warn()) << "onTimer: Unexpected timer callback";
304 }
305
306 close();
307}
308
309void
311{
312 ioPending_ = false;
313
314 if (ec)
315 {
316 if (ec == boost::asio::error::operation_aborted)
317 {
319 return;
320 }
321
322 fail("onConnect", ec);
323 return;
324 }
325
326 if (!socket_.is_open())
327 return;
328
329 // check if connection has really been established
330 socket_.local_endpoint(ec);
331 if (ec)
332 {
333 fail("onConnect", ec);
334 return;
335 }
336
337 if (shutdown_)
338 {
340 return;
341 }
342
343 ioPending_ = true;
344
346
347 stream_.set_verify_mode(boost::asio::ssl::verify_none);
348 stream_.async_handshake(
349 boost::asio::ssl::stream_base::client,
350 boost::asio::bind_executor(
351 strand_,
352 std::bind(&ConnectAttempt::onHandshake, shared_from_this(), std::placeholders::_1)));
353}
354
355void
357{
358 ioPending_ = false;
359
360 if (ec)
361 {
362 if (ec == boost::asio::error::operation_aborted)
363 {
365 return;
366 }
367
368 fail("onHandshake", ec);
369 return;
370 }
371
372 auto const local_endpoint = socket_.local_endpoint(ec);
373 if (ec)
374 {
375 fail("onHandshake", ec);
376 return;
377 }
378
380
381 // check if we connected to ourselves
384 {
385 fail("Self connection");
386 return;
387 }
388
389 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
390 if (!sharedValue)
391 {
392 shutdown();
393 return; // makeSharedValue logs
394 }
395
402
404 req_,
405 *sharedValue,
408 remote_endpoint_.address(),
409 app_);
410
411 if (shutdown_)
412 {
414 return;
415 }
416
417 ioPending_ = true;
418
419 boost::beast::http::async_write(
420 stream_,
421 req_,
422 boost::asio::bind_executor(
423 strand_,
424 std::bind(&ConnectAttempt::onWrite, shared_from_this(), std::placeholders::_1)));
425}
426
427void
429{
430 ioPending_ = false;
431
432 if (ec)
433 {
434 if (ec == boost::asio::error::operation_aborted)
435 {
437 return;
438 }
439
440 fail("onWrite", ec);
441 return;
442 }
443
444 if (shutdown_)
445 {
447 return;
448 }
449
450 ioPending_ = true;
451
453
454 boost::beast::http::async_read(
455 stream_,
456 read_buf_,
457 response_,
458 boost::asio::bind_executor(
459 strand_,
460 std::bind(&ConnectAttempt::onRead, shared_from_this(), std::placeholders::_1)));
461}
462
463void
465{
466 cancelTimer();
467 ioPending_ = false;
469
470 if (ec)
471 {
472 if (ec == boost::asio::error::eof)
473 {
474 JLOG(journal_.debug()) << "EOF";
475 shutdown();
476 return;
477 }
478
479 if (ec == boost::asio::error::operation_aborted)
480 {
482 return;
483 }
484
485 fail("onRead", ec);
486 return;
487 }
488
489 if (shutdown_)
490 {
492 return;
493 }
494
496}
497
498//--------------------------------------------------------------------------
499
500void
502{
504 {
505 // A peer may respond with service_unavailable and a list of alternative
506 // peers to connect to, a differing status code is unexpected
507 if (response_.result() != boost::beast::http::status::service_unavailable)
508 {
509 JLOG(journal_.warn()) << "Unable to upgrade to peer protocol: " << response_.result()
510 << " (" << response_.reason() << ")";
511 shutdown();
512 return;
513 }
514
515 // Parse response body to determine if this is a redirect or other
516 // service unavailable
517 std::string responseBody;
518 responseBody.reserve(boost::asio::buffer_size(response_.body().data()));
519 for (auto const buffer : response_.body().data())
520 {
521 responseBody.append(
522 static_cast<char const*>(buffer.data()), boost::asio::buffer_size(buffer));
523 }
524
525 Json::Value json;
526 Json::Reader reader;
527 auto const isValidJson = reader.parse(responseBody, json);
528
529 // Check if this is a redirect response (contains peer-ips field)
530 auto const isRedirect = isValidJson && json.isObject() && json.isMember("peer-ips");
531
532 if (!isRedirect)
533 {
534 JLOG(journal_.warn()) << "processResponse: " << remote_endpoint_
535 << " failed to upgrade to peer protocol: " << response_.result()
536 << " (" << response_.reason() << ")";
537
538 shutdown();
539 return;
540 }
541
542 Json::Value const& peerIps = json["peer-ips"];
543 if (!peerIps.isArray())
544 {
545 fail("processResponse: invalid peer-ips format");
546 return;
547 }
548
549 // Extract and validate peer endpoints
551 redirectEndpoints.reserve(peerIps.size());
552
553 for (auto const& ipValue : peerIps)
554 {
555 if (!ipValue.isString())
556 continue;
557
558 error_code ec;
559 auto const endpoint = parse_endpoint(ipValue.asString(), ec);
560 if (!ec)
561 redirectEndpoints.push_back(endpoint);
562 }
563
564 // Notify PeerFinder about the redirect redirectEndpoints may be empty
565 overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints);
566
567 fail("processResponse: failed to connect to peer: redirected");
568 return;
569 }
570
571 // Just because our peer selected a particular protocol version doesn't
572 // mean that it's acceptable to us. Check that it is:
573 std::optional<ProtocolVersion> negotiatedProtocol;
574
575 {
576 auto const pvs = parseProtocolVersions(response_["Upgrade"]);
577
578 if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
579 negotiatedProtocol = pvs[0];
580
581 if (!negotiatedProtocol)
582 {
583 fail("processResponse: Unable to negotiate protocol version");
584 return;
585 }
586 }
587
588 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
589 if (!sharedValue)
590 {
591 shutdown();
592 return; // makeSharedValue logs
593 }
594
595 try
596 {
597 auto const publicKey = verifyHandshake(
598 response_,
599 *sharedValue,
602 remote_endpoint_.address(),
603 app_);
604
605 usage_.setPublicKey(publicKey);
606
607 JLOG(journal_.debug()) << "Protocol: " << to_string(*negotiatedProtocol);
608 JLOG(journal_.info()) << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
609
610 auto const member = app_.getCluster().member(publicKey);
611 if (member)
612 {
613 JLOG(journal_.info()) << "Cluster name: " << *member;
614 }
615
616 auto const result = overlay_.peerFinder().activate(slot_, publicKey, member.has_value());
617 if (result != PeerFinder::Result::success)
618 {
620 ss << "Outbound Connect Attempt " << remote_endpoint_ << " " << to_string(result);
621 fail(ss.str());
622 return;
623 }
624
625 if (!socket_.is_open())
626 return;
627
628 if (shutdown_)
629 {
631 return;
632 }
633
634 auto const peer = std::make_shared<PeerImp>(
635 app_,
636 std::move(stream_ptr_),
637 read_buf_.data(),
638 std::move(slot_),
639 std::move(response_),
640 usage_,
641 publicKey,
642 *negotiatedProtocol,
643 id_,
644 overlay_);
645
646 overlay_.add_active(peer);
647 }
648 catch (std::exception const& e)
649 {
650 fail(std::string("Handshake failure (") + e.what() + ")");
651 return;
652 }
653}
654
655} // namespace xrpl
T append(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:17
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
bool isArray() const
UInt size() const
Number of values in array or object.
bool isObject() const
bool isMember(char const *key) const
Return true if the object has a member named key.
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool COMPRESSION
Definition Config.h:205
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:243
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:233
bool LEDGER_REPLAY
Definition Config.h:208
boost::system::error_code error_code
boost::beast::ssl_stream< middle_type > stream_type
void fail(std::string const &reason)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
void setTimer(ConnectionStep step)
Set timers for the specified connection step.
void processResponse()
Process the HTTP upgrade response from peer.
boost::asio::strand< boost::asio::io_context::executor_type > strand_
void stop() override
Stop the connection attempt.
ConnectionStep currentStep_
void onRead(error_code ec)
static std::string stepToString(ConnectionStep step)
std::unique_ptr< stream_type > stream_ptr_
void onConnect(error_code ec)
void onShutdown(error_code ec)
std::shared_ptr< PeerFinder::Slot > slot_
ConnectionStep
Represents the current phase of the connection establishment process.
response_type response_
boost::asio::ip::tcp::socket socket_type
boost::beast::multi_buffer read_buf_
void run()
Begin the connection attempt.
Peer::id_t const id_
boost::asio::ip::tcp::endpoint endpoint_type
endpoint_type remote_endpoint_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > stepTimer_
void onHandshake(error_code ec)
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
ConnectAttempt(Application &app, boost::asio::io_context &io_context, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, Peer::id_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
Construct a new ConnectAttempt object.
static constexpr std::chrono::seconds connectTimeout
Resource::Consumer usage_
beast::Journal const journal_
void onWrite(error_code ec)
void onTimer(error_code ec)
Handle timer expiration events.
void cancelTimer()
Cancel both global and step timers.
static bool isPeerUpgrade(http_request_type const &request)
void add_active(std::shared_ptr< PeerImp > const &peer)
PeerFinder::Manager & peerFinder()
Setup const & setup() const
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual Config config()=0
Returns the configuration for the manager.
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
An endpoint that consumes resources.
Definition Consumer.h:16
void setPublicKey(PublicKey const &publicKey)
Definition Consumer.cpp:132
virtual Cluster & getCluster()=0
T count(T... args)
T is_same_v
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:92
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
PublicKey verifyHandshake(boost::beast::http::fields const &headers, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
void buildHandshake(boost::beast::http::fields &h, xrpl::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
T push_back(T... args)
T reserve(T... args)
T str(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
static constexpr std::chrono::seconds tcpConnect
static constexpr std::chrono::seconds tlsHandshake
static constexpr std::chrono::seconds httpWrite
static constexpr std::chrono::seconds httpRead
static constexpr std::chrono::seconds tlsShutdown
std::optional< std::uint32_t > networkID
Definition Overlay.h:52
beast::IP::Address public_ip
Definition Overlay.h:49
bool peerPrivate
true if we want our IP address kept private.
T what(T... args)