rippled
Loading...
Searching...
No Matches
ConnectAttempt.cpp
1#include <xrpld/overlay/Cluster.h>
2#include <xrpld/overlay/detail/ConnectAttempt.h>
3#include <xrpld/overlay/detail/PeerImp.h>
4#include <xrpld/overlay/detail/ProtocolVersion.h>
5
6#include <xrpl/json/json_reader.h>
7
8#include <sstream>
9
10namespace ripple {
11
13 Application& app,
14 boost::asio::io_context& io_context,
15 endpoint_type const& remote_endpoint,
17 shared_context const& context,
20 beast::Journal journal,
21 OverlayImpl& overlay)
22 : Child(overlay)
23 , app_(app)
24 , id_(id)
25 , sink_(journal, OverlayImpl::makePrefix(id))
26 , journal_(sink_)
27 , remote_endpoint_(remote_endpoint)
28 , usage_(usage)
29 , strand_(boost::asio::make_strand(io_context))
30 , timer_(io_context)
31 , stepTimer_(io_context)
32 , stream_ptr_(std::make_unique<stream_type>(
33 socket_type(std::forward<boost::asio::io_context&>(io_context)),
34 *context))
35 , socket_(stream_ptr_->next_layer().socket())
36 , stream_(*stream_ptr_)
37 , slot_(slot)
38{
39}
40
42{
43 // slot_ will be null if we successfully connected
44 // and transferred ownership to a PeerImp
45 if (slot_ != nullptr)
47}
48
49void
51{
52 if (!strand_.running_in_this_thread())
53 return boost::asio::post(
55
56 if (!socket_.is_open())
57 return;
58
59 JLOG(journal_.debug()) << "stop: Stop";
60
61 shutdown();
62}
63
64void
66{
67 if (!strand_.running_in_this_thread())
68 return boost::asio::post(
70
71 JLOG(journal_.debug()) << "run: connecting to " << remote_endpoint_;
72
73 ioPending_ = true;
74
75 // Allow up to connectTimeout_ seconds to establish remote peer connection
77
78 stream_.next_layer().async_connect(
80 boost::asio::bind_executor(
81 strand_,
85 std::placeholders::_1)));
86}
87
88//------------------------------------------------------------------------------
89
90void
92{
93 XRPL_ASSERT(
94 strand_.running_in_this_thread(),
95 "ripple::ConnectAttempt::shutdown: strand in this thread");
96
97 if (!socket_.is_open())
98 return;
99
100 shutdown_ = true;
101 boost::beast::get_lowest_layer(stream_).cancel();
102
104}
105
106void
108{
109 XRPL_ASSERT(
110 strand_.running_in_this_thread(),
111 "ripple::ConnectAttempt::tryAsyncShutdown : strand in this thread");
112
114 return;
115
116 if (ioPending_)
117 return;
118
119 // gracefully shutdown the SSL socket, performing a shutdown handshake
122 {
124 return stream_.async_shutdown(bind_executor(
125 strand_,
126 std::bind(
129 std::placeholders::_1)));
130 }
131
132 close();
133}
134
135void
137{
138 cancelTimer();
139
140 if (ec)
141 {
142 // - eof: the stream was cleanly closed
143 // - operation_aborted: an expired timer (slow shutdown)
144 // - stream_truncated: the tcp connection closed (no handshake) it could
145 // occur if a peer does not perform a graceful disconnect
146 // - broken_pipe: the peer is gone
147 // - application data after close notify: benign SSL shutdown condition
148 bool shouldLog =
149 (ec != boost::asio::error::eof &&
150 ec != boost::asio::error::operation_aborted &&
151 ec.message().find("application data after close notify") ==
152 std::string::npos);
153
154 if (shouldLog)
155 {
156 JLOG(journal_.debug()) << "onShutdown: " << ec.message();
157 }
158 }
159
160 close();
161}
162
163void
165{
166 XRPL_ASSERT(
167 strand_.running_in_this_thread(),
168 "ripple::ConnectAttempt::close : strand in this thread");
169 if (!socket_.is_open())
170 return;
171
172 cancelTimer();
173
174 error_code ec;
175 socket_.close(ec);
176}
177
178void
180{
181 JLOG(journal_.debug()) << reason;
182 shutdown();
183}
184
185void
187{
188 JLOG(journal_.debug()) << name << ": " << ec.message();
189 shutdown();
190}
191
192void
194{
195 currentStep_ = step;
196
197 // Set global timer (only if not already set)
198 if (timer_.expiry() == std::chrono::steady_clock::time_point{})
199 {
200 try
201 {
202 timer_.expires_after(connectTimeout);
203 timer_.async_wait(boost::asio::bind_executor(
204 strand_,
205 std::bind(
208 std::placeholders::_1)));
209 }
210 catch (std::exception const& ex)
211 {
212 JLOG(journal_.error()) << "setTimer (global): " << ex.what();
213 return close();
214 }
215 }
216
217 // Set step-specific timer
218 try
219 {
220 std::chrono::seconds stepTimeout;
221 switch (step)
222 {
224 stepTimeout = StepTimeouts::tcpConnect;
225 break;
227 stepTimeout = StepTimeouts::tlsHandshake;
228 break;
230 stepTimeout = StepTimeouts::httpWrite;
231 break;
233 stepTimeout = StepTimeouts::httpRead;
234 break;
236 stepTimeout = StepTimeouts::tlsShutdown;
237 break;
240 return; // No timer needed for init or complete step
241 }
242
243 // call to expires_after cancels previous timer
244 stepTimer_.expires_after(stepTimeout);
245 stepTimer_.async_wait(boost::asio::bind_executor(
246 strand_,
247 std::bind(
250 std::placeholders::_1)));
251
252 JLOG(journal_.trace()) << "setTimer: " << stepToString(step)
253 << " timeout=" << stepTimeout.count() << "s";
254 }
255 catch (std::exception const& ex)
256 {
257 JLOG(journal_.error())
258 << "setTimer (step " << stepToString(step) << "): " << ex.what();
259 return close();
260 }
261}
262
263void
265{
266 try
267 {
268 timer_.cancel();
269 stepTimer_.cancel();
270 }
271 catch (boost::system::system_error const&)
272 {
273 // ignored
274 }
275}
276
277void
279{
280 if (!socket_.is_open())
281 return;
282
283 if (ec)
284 {
285 // do not initiate shutdown, timers are frequently cancelled
286 if (ec == boost::asio::error::operation_aborted)
287 return;
288
289 // This should never happen
290 JLOG(journal_.error()) << "onTimer: " << ec.message();
291 return close();
292 }
293
294 // Determine which timer expired by checking their expiry times
295 auto const now = std::chrono::steady_clock::now();
296 bool globalExpired = (timer_.expiry() <= now);
297 bool stepExpired = (stepTimer_.expiry() <= now);
298
299 if (globalExpired)
300 {
301 JLOG(journal_.debug())
302 << "onTimer: Global timeout; step: " << stepToString(currentStep_);
303 }
304 else if (stepExpired)
305 {
306 JLOG(journal_.debug())
307 << "onTimer: Step timeout; step: " << stepToString(currentStep_);
308 }
309 else
310 {
311 JLOG(journal_.warn()) << "onTimer: Unexpected timer callback";
312 }
313
314 close();
315}
316
317void
319{
320 ioPending_ = false;
321
322 if (ec)
323 {
324 if (ec == boost::asio::error::operation_aborted)
325 return tryAsyncShutdown();
326
327 return fail("onConnect", ec);
328 }
329
330 if (!socket_.is_open())
331 return;
332
333 // check if connection has really been established
334 socket_.local_endpoint(ec);
335 if (ec)
336 return fail("onConnect", ec);
337
338 if (shutdown_)
339 return tryAsyncShutdown();
340
341 ioPending_ = true;
342
344
345 stream_.set_verify_mode(boost::asio::ssl::verify_none);
346 stream_.async_handshake(
347 boost::asio::ssl::stream_base::client,
348 boost::asio::bind_executor(
349 strand_,
350 std::bind(
353 std::placeholders::_1)));
354}
355
356void
358{
359 ioPending_ = false;
360
361 if (ec)
362 {
363 if (ec == boost::asio::error::operation_aborted)
364 return tryAsyncShutdown();
365
366 return fail("onHandshake", ec);
367 }
368
369 auto const local_endpoint = socket_.local_endpoint(ec);
370 if (ec)
371 return fail("onHandshake", ec);
372
374
375 // check if we connected to ourselves
378 return fail("Self connection");
379
380 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
381 if (!sharedValue)
382 return shutdown(); // makeSharedValue logs
383
390
392 req_,
393 *sharedValue,
396 remote_endpoint_.address(),
397 app_);
398
399 if (shutdown_)
400 return tryAsyncShutdown();
401
402 ioPending_ = true;
403
404 boost::beast::http::async_write(
405 stream_,
406 req_,
407 boost::asio::bind_executor(
408 strand_,
409 std::bind(
412 std::placeholders::_1)));
413}
414
415void
417{
418 ioPending_ = false;
419
420 if (ec)
421 {
422 if (ec == boost::asio::error::operation_aborted)
423 return tryAsyncShutdown();
424
425 return fail("onWrite", ec);
426 }
427
428 if (shutdown_)
429 return tryAsyncShutdown();
430
431 ioPending_ = true;
432
434
435 boost::beast::http::async_read(
436 stream_,
437 read_buf_,
438 response_,
439 boost::asio::bind_executor(
440 strand_,
441 std::bind(
444 std::placeholders::_1)));
445}
446
447void
449{
450 cancelTimer();
451 ioPending_ = false;
453
454 if (ec)
455 {
456 if (ec == boost::asio::error::eof)
457 {
458 JLOG(journal_.debug()) << "EOF";
459 return shutdown();
460 }
461
462 if (ec == boost::asio::error::operation_aborted)
463 return tryAsyncShutdown();
464
465 return fail("onRead", ec);
466 }
467
468 if (shutdown_)
469 return tryAsyncShutdown();
470
472}
473
474//--------------------------------------------------------------------------
475
476void
478{
480 {
481 // A peer may respond with service_unavailable and a list of alternative
482 // peers to connect to, a differing status code is unexpected
483 if (response_.result() !=
484 boost::beast::http::status::service_unavailable)
485 {
486 JLOG(journal_.warn())
487 << "Unable to upgrade to peer protocol: " << response_.result()
488 << " (" << response_.reason() << ")";
489 return shutdown();
490 }
491
492 // Parse response body to determine if this is a redirect or other
493 // service unavailable
494 std::string responseBody;
495 responseBody.reserve(boost::asio::buffer_size(response_.body().data()));
496 for (auto const buffer : response_.body().data())
497 responseBody.append(
498 static_cast<char const*>(buffer.data()),
499 boost::asio::buffer_size(buffer));
500
501 Json::Value json;
502 Json::Reader reader;
503 auto const isValidJson = reader.parse(responseBody, json);
504
505 // Check if this is a redirect response (contains peer-ips field)
506 auto const isRedirect =
507 isValidJson && json.isObject() && json.isMember("peer-ips");
508
509 if (!isRedirect)
510 {
511 JLOG(journal_.warn())
512 << "processResponse: " << remote_endpoint_
513 << " failed to upgrade to peer protocol: " << response_.result()
514 << " (" << response_.reason() << ")";
515
516 return shutdown();
517 }
518
519 Json::Value const& peerIps = json["peer-ips"];
520 if (!peerIps.isArray())
521 return fail("processResponse: invalid peer-ips format");
522
523 // Extract and validate peer endpoints
525 redirectEndpoints.reserve(peerIps.size());
526
527 for (auto const& ipValue : peerIps)
528 {
529 if (!ipValue.isString())
530 continue;
531
532 error_code ec;
533 auto const endpoint = parse_endpoint(ipValue.asString(), ec);
534 if (!ec)
535 redirectEndpoints.push_back(endpoint);
536 }
537
538 // Notify PeerFinder about the redirect redirectEndpoints may be empty
539 overlay_.peerFinder().onRedirects(remote_endpoint_, redirectEndpoints);
540
541 return fail("processResponse: failed to connect to peer: redirected");
542 }
543
544 // Just because our peer selected a particular protocol version doesn't
545 // mean that it's acceptable to us. Check that it is:
546 std::optional<ProtocolVersion> negotiatedProtocol;
547
548 {
549 auto const pvs = parseProtocolVersions(response_["Upgrade"]);
550
551 if (pvs.size() == 1 && isProtocolSupported(pvs[0]))
552 negotiatedProtocol = pvs[0];
553
554 if (!negotiatedProtocol)
555 return fail(
556 "processResponse: Unable to negotiate protocol version");
557 }
558
559 auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
560 if (!sharedValue)
561 return shutdown(); // makeSharedValue logs
562
563 try
564 {
565 auto const publicKey = verifyHandshake(
566 response_,
567 *sharedValue,
570 remote_endpoint_.address(),
571 app_);
572
573 usage_.setPublicKey(publicKey);
574
575 JLOG(journal_.debug())
576 << "Protocol: " << to_string(*negotiatedProtocol);
577 JLOG(journal_.info())
578 << "Public Key: " << toBase58(TokenType::NodePublic, publicKey);
579
580 auto const member = app_.cluster().member(publicKey);
581 if (member)
582 {
583 JLOG(journal_.info()) << "Cluster name: " << *member;
584 }
585
586 auto const result = overlay_.peerFinder().activate(
587 slot_, publicKey, member.has_value());
588 if (result != PeerFinder::Result::success)
589 {
591 ss << "Outbound Connect Attempt " << remote_endpoint_ << " "
592 << to_string(result);
593 return fail(ss.str());
594 }
595
596 if (!socket_.is_open())
597 return;
598
599 if (shutdown_)
600 return tryAsyncShutdown();
601
602 auto const peer = std::make_shared<PeerImp>(
603 app_,
604 std::move(stream_ptr_),
605 read_buf_.data(),
606 std::move(slot_),
607 std::move(response_),
608 usage_,
609 publicKey,
610 *negotiatedProtocol,
611 id_,
612 overlay_);
613
614 overlay_.add_active(peer);
615 }
616 catch (std::exception const& e)
617 {
618 return fail(std::string("Handshake failure (") + e.what() + ")");
619 }
620}
621
622} // namespace ripple
T append(T... args)
T bind(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:20
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
bool isArray() const
UInt size() const
Number of values in array or object.
bool isObject() const
bool isMember(char const *key) const
Return true if the object has a member named key.
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual Cluster & cluster()=0
std::optional< std::string > member(PublicKey const &node) const
Determines whether a node belongs in the cluster.
Definition Cluster.cpp:19
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
Definition Config.h:229
bool LEDGER_REPLAY
Definition Config.h:204
bool TX_REDUCE_RELAY_ENABLE
Definition Config.h:239
bool COMPRESSION
Definition Config.h:201
static constexpr std::chrono::seconds connectTimeout
void stop() override
Stop the connection attempt.
boost::asio::ip::tcp::socket socket_type
void cancelTimer()
Cancel both global and step timers.
boost::system::error_code error_code
std::unique_ptr< stream_type > stream_ptr_
std::shared_ptr< PeerFinder::Slot > slot_
ConnectAttempt(Application &app, boost::asio::io_context &io_context, endpoint_type const &remote_endpoint, Resource::Consumer usage, shared_context const &context, Peer::id_t id, std::shared_ptr< PeerFinder::Slot > const &slot, beast::Journal journal, OverlayImpl &overlay)
Construct a new ConnectAttempt object.
ConnectionStep currentStep_
Resource::Consumer usage_
boost::asio::strand< boost::asio::io_context::executor_type > strand_
static std::string stepToString(ConnectionStep step)
void run()
Begin the connection attempt.
void onHandshake(error_code ec)
boost::asio::ip::tcp::endpoint endpoint_type
void onWrite(error_code ec)
void processResponse()
Process the HTTP upgrade response from peer.
void onTimer(error_code ec)
Handle timer expiration events.
void setTimer(ConnectionStep step)
Set timers for the specified connection step.
void onShutdown(error_code ec)
boost::beast::ssl_stream< middle_type > stream_type
boost::beast::multi_buffer read_buf_
void onConnect(error_code ec)
void fail(std::string const &reason)
void onRead(error_code ec)
static boost::asio::ip::tcp::endpoint parse_endpoint(std::string const &s, boost::system::error_code &ec)
beast::Journal const journal_
ConnectionStep
Represents the current phase of the connection establishment process.
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
endpoint_type remote_endpoint_
boost::asio::basic_waitable_timer< std::chrono::steady_clock > stepTimer_
PeerFinder::Manager & peerFinder()
void add_active(std::shared_ptr< PeerImp > const &peer)
static bool isPeerUpgrade(http_request_type const &request)
Setup const & setup() const
virtual bool onConnected(std::shared_ptr< Slot > const &slot, beast::IP::Endpoint const &local_endpoint)=0
Called when an outbound connection attempt succeeds.
virtual Config config()=0
Returns the configuration for the manager.
virtual void on_closed(std::shared_ptr< Slot > const &slot)=0
Called when the slot is closed.
virtual void onRedirects(boost::asio::ip::tcp::endpoint const &remote_address, std::vector< boost::asio::ip::tcp::endpoint > const &eps)=0
Called when we received redirect IPs from a busy peer.
virtual Result activate(std::shared_ptr< Slot > const &slot, PublicKey const &key, bool reserved)=0
Request an active slot type.
An endpoint that consumes resources.
Definition Consumer.h:17
void setPublicKey(PublicKey const &publicKey)
Definition Consumer.cpp:133
T count(T... args)
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string toBase58(AccountID const &v)
Convert AccountID to base58 checked string.
Definition AccountID.cpp:95
std::vector< ProtocolVersion > parseProtocolVersions(boost::beast::string_view const &value)
Parse a set of protocol versions.
void buildHandshake(boost::beast::http::fields &h, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote_ip, Application &app)
Insert fields headers necessary for upgrading the link to the peer protocol.
std::optional< uint256 > makeSharedValue(stream_type &ssl, beast::Journal journal)
Computes a shared value based on the SSL connection state.
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
bool isProtocolSupported(ProtocolVersion const &v)
Determine whether we support a specific protocol version.
PublicKey verifyHandshake(boost::beast::http::fields const &headers, ripple::uint256 const &sharedValue, std::optional< std::uint32_t > networkID, beast::IP::Address public_ip, beast::IP::Address remote, Application &app)
Validate header fields necessary for upgrading the link to the peer protocol.
STL namespace.
T push_back(T... args)
T reserve(T... args)
T str(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
static constexpr std::chrono::seconds tcpConnect
static constexpr std::chrono::seconds tlsShutdown
static constexpr std::chrono::seconds tlsHandshake
static constexpr std::chrono::seconds httpWrite
static constexpr std::chrono::seconds httpRead
beast::IP::Address public_ip
Definition Overlay.h:50
std::optional< std::uint32_t > networkID
Definition Overlay.h:53
bool peerPrivate
true if we want our IP address kept private.
T what(T... args)