rippled
Loading...
Searching...
No Matches
src/xrpld/peerfinder/detail/Logic.h
1#pragma once
2
3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
13
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
19
20#include <algorithm>
21#include <functional>
22#include <map>
23#include <memory>
24#include <set>
25
26namespace xrpl {
27namespace PeerFinder {
28
33template <class Checker>
34class Logic
35{
36public:
37 // Maps remote endpoints to slots. Since a slot has a
38 // remote endpoint upon construction, this holds all counts.
39 //
41
46
48
49 // True if we are stopping.
50 bool stopping_ = false;
51
52 // The source we are currently fetching.
53 // This is used to cancel I/O during program exit.
55
56 // Configuration settings
58
59 // Slot counts and other aggregate statistics.
61
62 // A list of slots that should always be connected
64
65 // Live livecache from mtENDPOINTS messages
67
68 // LiveCache of addresses suitable for gaining initial connections
70
71 // Holds all counts
73
74 // The addresses (but not port) we are connected to. This includes
75 // outgoing connection attempts. Note that this set can contain
76 // duplicates (since the port is not set)
78
79 // Set of public keys belonging to active peers
81
82 // A list of dynamic sources to consult as a fallback
84
86
88
89 //--------------------------------------------------------------------------
90
91 Logic(clock_type& clock, Store& store, Checker& checker, beast::Journal journal)
92 : m_journal(journal)
93 , m_clock(clock)
94 , m_store(store)
95 , m_checker(checker)
96 , livecache_(m_clock, journal)
97 , bootcache_(store, m_clock, journal)
100 {
101 config({});
102 }
103
104 // Load persistent state information from the Store
105 //
106 void
108 {
111 }
112
119 void
121 {
123 stopping_ = true;
124 if (fetchSource_ != nullptr)
125 fetchSource_->cancel();
126 }
127
128 //--------------------------------------------------------------------------
129 //
130 // Manager
131 //
132 //--------------------------------------------------------------------------
133
134 void
135 config(Config const& c)
136 {
138 config_ = c;
140 }
141
142 Config
144 {
146 return config_;
147 }
148
149 void
154
155 void
157 {
159
160 if (addresses.empty())
161 {
162 JLOG(m_journal.info()) << "Could not resolve fixed slot '" << name << "'";
163 return;
164 }
165
166 for (auto const& remote_address : addresses)
167 {
168 if (remote_address.port() == 0)
169 {
170 Throw<std::runtime_error>("Port not specified for address:" + remote_address.to_string());
171 }
172
173 auto result(fixed_.emplace(
175
176 if (result.second)
177 {
178 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic add fixed '" << name << "' at " << remote_address;
179 return;
180 }
181 }
182 }
183
184 //--------------------------------------------------------------------------
185
186 // Called when the Checker completes a connectivity test
187 void
189 beast::IP::Endpoint const& remoteAddress,
190 beast::IP::Endpoint const& checkedAddress,
191 boost::system::error_code ec)
192 {
193 if (ec == boost::asio::error::operation_aborted)
194 return;
195
197 auto const iter(slots_.find(remoteAddress));
198 if (iter == slots_.end())
199 {
200 // The slot disconnected before we finished the check
201 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic tested " << checkedAddress
202 << " but the connection was closed";
203 return;
204 }
205
206 SlotImp& slot(*iter->second);
207 slot.checked = true;
208 slot.connectivityCheckInProgress = false;
209
210 beast::WrappedSink sink{m_journal.sink(), slot.prefix()};
211 beast::Journal journal{sink};
212
213 if (ec)
214 {
215 // VFALCO TODO Should we retry depending on the error?
216 slot.canAccept = false;
217 JLOG(journal.error()) << "Logic testing " << iter->first << " with error, " << ec.message();
218 bootcache_.on_failure(checkedAddress);
219 return;
220 }
221
222 slot.canAccept = true;
223 slot.set_listening_port(checkedAddress.port());
224 JLOG(journal.debug()) << "Logic testing " << checkedAddress << " succeeded";
225 }
226
227 //--------------------------------------------------------------------------
228
230 new_inbound_slot(beast::IP::Endpoint const& local_endpoint, beast::IP::Endpoint const& remote_endpoint)
231 {
232 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic accept" << remote_endpoint << " on local "
233 << local_endpoint;
234
236
237 // Check for connection limit per address
238 if (is_public(remote_endpoint))
239 {
240 auto const count = connectedAddresses_.count(remote_endpoint.address());
241 if (count + 1 > config_.ipLimit)
242 {
243 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping inbound " << remote_endpoint
244 << " because of ip limits.";
245 return {SlotImp::ptr(), Result::ipLimitExceeded};
246 }
247 }
248
249 // Check for duplicate connection
250 if (slots_.find(remote_endpoint) != slots_.end())
251 {
252 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping " << remote_endpoint
253 << " as duplicate incoming";
254 return {SlotImp::ptr(), Result::duplicatePeer};
255 }
256
257 // Create the slot
258 SlotImp::ptr const slot(
259 std::make_shared<SlotImp>(local_endpoint, remote_endpoint, fixed(remote_endpoint.address()), m_clock));
260 // Add slot to table
261 auto const result(slots_.emplace(slot->remote_endpoint(), slot));
262 // Remote address must not already exist
263 XRPL_ASSERT(
264 result.second,
265 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
266 "inserted");
267 // Add to the connected address list
268 connectedAddresses_.emplace(remote_endpoint.address());
269
270 // Update counts
271 counts_.add(*slot);
272
273 return {result.first->second, Result::success};
274 }
275
276 // Can't check for self-connect because we don't know the local endpoint
279 {
280 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect " << remote_endpoint;
281
283
284 // Check for duplicate connection
285 if (slots_.find(remote_endpoint) != slots_.end())
286 {
287 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping " << remote_endpoint
288 << " as duplicate connect";
289 return {SlotImp::ptr(), Result::duplicatePeer};
290 }
291
292 // Create the slot
293 SlotImp::ptr const slot(std::make_shared<SlotImp>(remote_endpoint, fixed(remote_endpoint), m_clock));
294
295 // Add slot to table
296 auto const result = slots_.emplace(slot->remote_endpoint(), slot);
297 // Remote address must not already exist
298 XRPL_ASSERT(
299 result.second,
300 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
301 "inserted");
302
303 // Add to the connected address list
304 connectedAddresses_.emplace(remote_endpoint.address());
305
306 // Update counts
307 counts_.add(*slot);
308
309 return {result.first->second, Result::success};
310 }
311
312 bool
313 onConnected(SlotImp::ptr const& slot, beast::IP::Endpoint const& local_endpoint)
314 {
315 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
316 beast::Journal journal{sink};
317
318 JLOG(journal.trace()) << "Logic connected on local " << local_endpoint;
319
321
322 // The object must exist in our table
323 XRPL_ASSERT(
324 slots_.find(slot->remote_endpoint()) != slots_.end(),
325 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
326 // Assign the local endpoint now that it's known
327 slot->local_endpoint(local_endpoint);
328
329 // Check for self-connect by address
330 {
331 auto const iter(slots_.find(local_endpoint));
332 if (iter != slots_.end())
333 {
334 XRPL_ASSERT(
335 iter->second->local_endpoint() == slot->remote_endpoint(),
336 "xrpl::PeerFinder::Logic::onConnected : local and remote "
337 "endpoints do match");
338 JLOG(journal.warn()) << "Logic dropping as self connect";
339 return false;
340 }
341 }
342
343 // Update counts
344 counts_.remove(*slot);
345 slot->state(Slot::connected);
346 counts_.add(*slot);
347 return true;
348 }
349
350 Result
351 activate(SlotImp::ptr const& slot, PublicKey const& key, bool reserved)
352 {
353 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
354 beast::Journal journal{sink};
355
356 JLOG(journal.debug()) << "Logic handshake " << slot->remote_endpoint() << " with "
357 << (reserved ? "reserved " : "") << "key " << key;
358
360
361 // The object must exist in our table
362 XRPL_ASSERT(
363 slots_.find(slot->remote_endpoint()) != slots_.end(),
364 "xrpl::PeerFinder::Logic::activate : valid slot input");
365 // Must be accepted or connected
366 XRPL_ASSERT(
367 slot->state() == Slot::accept || slot->state() == Slot::connected,
368 "xrpl::PeerFinder::Logic::activate : valid slot state");
369
370 // Check for duplicate connection by key
371 if (keys_.find(key) != keys_.end())
372 return Result::duplicatePeer;
373
374 // If the peer belongs to a cluster or is reserved,
375 // update the slot to reflect that.
376 counts_.remove(*slot);
377 slot->reserved(reserved);
378 counts_.add(*slot);
379
380 // See if we have an open space for this slot
381 if (!counts_.can_activate(*slot))
382 {
383 if (!slot->inbound())
384 bootcache_.on_success(slot->remote_endpoint());
385 if (slot->inbound() && counts_.in_max() == 0)
386 return Result::inboundDisabled;
387 return Result::full;
388 }
389
390 // Set the key right before adding to the map, otherwise we might
391 // assert later when erasing the key.
392 slot->public_key(key);
393 {
394 [[maybe_unused]] bool const inserted = keys_.insert(key).second;
395 // Public key must not already exist
396 XRPL_ASSERT(inserted, "xrpl::PeerFinder::Logic::activate : public key inserted");
397 }
398
399 // Change state and update counts
400 counts_.remove(*slot);
401 slot->activate(m_clock.now());
402 counts_.add(*slot);
403
404 if (!slot->inbound())
405 bootcache_.on_success(slot->remote_endpoint());
406
407 // Mark fixed slot success
408 if (slot->fixed() && !slot->inbound())
409 {
410 auto iter(fixed_.find(slot->remote_endpoint()));
411 if (iter == fixed_.end())
413 "PeerFinder::Logic::activate(): remote_endpoint "
414 "missing from fixed_");
415
416 iter->second.success(m_clock.now());
417 JLOG(journal.trace()) << "Logic fixed success";
418 }
419
420 return Result::success;
421 }
422
429 {
431 RedirectHandouts h(slot);
433 handout(&h, (&h) + 1, livecache_.hops.begin(), livecache_.hops.end());
434 return std::move(h.list());
435 }
436
440 // VFALCO TODO This should add the returned addresses to the
441 // squelch list in one go once the list is built,
442 // rather than having each module add to the squelch list.
445 {
447
449
450 // Count how many more outbound attempts to make
451 //
452 auto needed(counts_.attempts_needed());
453 if (needed == 0)
454 return none;
455
456 ConnectHandouts h(needed, m_squelches);
457
458 // Make sure we don't connect to already-connected entries.
459 for (auto const& s : slots_)
460 {
461 auto const result(m_squelches.insert(s.second->remote_endpoint().address()));
462 if (!result.second)
463 m_squelches.touch(result.first);
464 }
465
466 // 1. Use Fixed if:
467 // Fixed active count is below fixed count AND
468 // ( There are eligible fixed addresses to try OR
469 // Any outbound attempts are in progress)
470 //
471 if (counts_.fixed_active() < fixed_.size())
472 {
473 get_fixed(needed, h.list(), m_squelches);
474
475 if (!h.list().empty())
476 {
477 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect " << h.list().size() << " fixed";
478 return h.list();
479 }
480
481 if (counts_.attempts() > 0)
482 {
483 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
484 return none;
485 }
486 }
487
488 // Only proceed if auto connect is enabled and we
489 // have less than the desired number of outbound slots
490 //
492 return none;
493
494 // 2. Use Livecache if:
495 // There are any entries in the cache OR
496 // Any outbound attempts are in progress
497 //
498 {
500 handout(&h, (&h) + 1, livecache_.hops.rbegin(), livecache_.hops.rend());
501 if (!h.list().empty())
502 {
503 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect " << h.list().size() << " live "
504 << ((h.list().size() > 1) ? "endpoints" : "endpoint");
505 return h.list();
506 }
507 else if (counts_.attempts() > 0)
508 {
509 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
510 return none;
511 }
512 }
513
514 /* 3. Bootcache refill
515 If the Bootcache is empty, try to get addresses from the current
516 set of Sources and add them into the Bootstrap cache.
517
518 Pseudocode:
519 If ( domainNames.count() > 0 AND (
520 unusedBootstrapIPs.count() == 0
521 OR activeNameResolutions.count() > 0) )
522 ForOneOrMore (DomainName that hasn't been resolved recently)
523 Contact DomainName and add entries to the
524 unusedBootstrapIPs return;
525 */
526
527 // 4. Use Bootcache if:
528 // There are any entries we haven't tried lately
529 //
530 for (auto iter(bootcache_.begin()); !h.full() && iter != bootcache_.end(); ++iter)
531 h.try_insert(*iter);
532
533 if (!h.list().empty())
534 {
535 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect " << h.list().size() << " boot "
536 << ((h.list().size() > 1) ? "addresses" : "address");
537 return h.list();
538 }
539
540 // If we get here we are stuck
541 return none;
542 }
543
546 {
548
550
551 clock_type::time_point const now = m_clock.now();
552 if (m_whenBroadcast <= now)
553 {
555
556 {
557 // build list of active slots
559 slots.reserve(slots_.size());
560 std::for_each(slots_.cbegin(), slots_.cend(), [&slots](Slots::value_type const& value) {
561 if (value.second->state() == Slot::active)
562 slots.emplace_back(value.second);
563 });
564 std::shuffle(slots.begin(), slots.end(), default_prng());
565
566 // build target vector
567 targets.reserve(slots.size());
569 slots.cbegin(), slots.cend(), [&targets](SlotImp::ptr const& slot) { targets.emplace_back(slot); });
570 }
571
572 /* VFALCO NOTE
573 This is a temporary measure. Once we know our own IP
574 address, the correct solution is to put it into the Livecache
575 at hops 0, and go through the regular handout path. This way
576 we avoid handing our address out too frequently, which this code
577 suffers from.
578 */
579 // Add an entry for ourselves if:
580 // 1. We want incoming
581 // 2. We have slots
582 // 3. We haven't failed the firewalled test
583 //
584 if (config_.wantIncoming && counts_.in_max() > 0)
585 {
586 Endpoint ep;
587 ep.hops = 0;
588 // we use the unspecified (0) address here because the value is
589 // irrelevant to recipients. When peers receive an endpoint
590 // with 0 hops, they use the socket remote_addr instead of the
591 // value in the message. Furthermore, since the address value
592 // is ignored, the type/version (ipv4 vs ipv6) doesn't matter
593 // either. ipv6 has a slightly more compact string
594 // representation of 0, so use that for self entries.
596 for (auto& t : targets)
597 t.insert(ep);
598 }
599
600 // build sequence of endpoints by hops
602 handout(targets.begin(), targets.end(), livecache_.hops.begin(), livecache_.hops.end());
603
604 // broadcast
605 for (auto const& t : targets)
606 {
607 SlotImp::ptr const& slot = t.slot();
608 auto const& list = t.list();
609 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
610 beast::Journal journal{sink};
611 JLOG(journal.trace()) << "Logic sending " << list.size()
612 << ((list.size() == 1) ? " endpoint" : " endpoints");
613 result.push_back(std::make_pair(slot, list));
614 }
615
617 }
618
619 return result;
620 }
621
622 void
624 {
626
627 // Expire the Livecache
629
630 // Expire the recent cache in each slot
631 for (auto const& entry : slots_)
632 entry.second->expire();
633
634 // Expire the recent attempts table
636
638 }
639
640 //--------------------------------------------------------------------------
641
642 // Validate and clean up the list that we received from the slot.
643 void
645 {
646 bool neighbor(false);
647 for (auto iter = list.begin(); iter != list.end();)
648 {
649 Endpoint& ep(*iter);
650
651 // Enforce hop limit
652 if (ep.hops > Tuning::maxHops)
653 {
654 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop " << ep.address << " for excess hops "
655 << ep.hops;
656 iter = list.erase(iter);
657 continue;
658 }
659
660 // See if we are directly connected
661 if (ep.hops == 0)
662 {
663 if (!neighbor)
664 {
665 // Fill in our neighbors remote address
666 neighbor = true;
667 ep.address = slot->remote_endpoint().at_port(ep.address.port());
668 }
669 else
670 {
671 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop " << ep.address << " for extra self";
672 iter = list.erase(iter);
673 continue;
674 }
675 }
676
677 // Discard invalid addresses
678 if (!is_valid_address(ep.address))
679 {
680 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop " << ep.address << " as invalid";
681 iter = list.erase(iter);
682 continue;
683 }
684
685 // Filter duplicates
686 if (std::any_of(list.begin(), iter, [ep](Endpoints::value_type const& other) {
687 return ep.address == other.address;
688 }))
689 {
690 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop " << ep.address << " as duplicate";
691 iter = list.erase(iter);
692 continue;
693 }
694
695 // Increment hop count on the incoming message, so
696 // we store it at the hop count we will send it at.
697 //
698 ++ep.hops;
699
700 ++iter;
701 }
702 }
703
704 void
706 {
707 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
708 beast::Journal journal{sink};
709
710 // If we're sent too many endpoints, sample them at random:
712 {
713 std::shuffle(list.begin(), list.end(), default_prng());
715 }
716
717 JLOG(journal.trace()) << "Endpoints contained " << list.size() << ((list.size() > 1) ? " entries" : " entry");
718
720
721 // The object must exist in our table
722 XRPL_ASSERT(
723 slots_.find(slot->remote_endpoint()) != slots_.end(),
724 "xrpl::PeerFinder::Logic::on_endpoints : valid slot input");
725
726 // Must be handshaked!
727 XRPL_ASSERT(slot->state() == Slot::active, "xrpl::PeerFinder::Logic::on_endpoints : valid slot state");
728
730
731 // Limit how often we accept new endpoints
732 if (slot->whenAcceptEndpoints > now)
733 return;
734
735 preprocess(slot, list);
736
737 for (auto const& ep : list)
738 {
739 XRPL_ASSERT(ep.hops, "xrpl::PeerFinder::Logic::on_endpoints : nonzero hops");
740
741 slot->recent.insert(ep.address, ep.hops);
742
743 // Note hops has been incremented, so 1
744 // means a directly connected neighbor.
745 //
746 if (ep.hops == 1)
747 {
748 if (slot->connectivityCheckInProgress)
749 {
750 JLOG(journal.debug()) << "Logic testing " << ep.address << " already in progress";
751 continue;
752 }
753
754 if (!slot->checked)
755 {
756 // Mark that a check for this slot is now in progress.
757 slot->connectivityCheckInProgress = true;
758
759 // Test the slot's listening port before
760 // adding it to the livecache for the first time.
761 //
763 ep.address,
764 std::bind(
765 &Logic::checkComplete, this, slot->remote_endpoint(), ep.address, std::placeholders::_1));
766
767 // Note that we simply discard the first Endpoint
768 // that the neighbor sends when we perform the
769 // listening test. They will just send us another
770 // one in a few seconds.
771
772 continue;
773 }
774
775 // If they failed the test then skip the address
776 if (!slot->canAccept)
777 continue;
778 }
779
780 // We only add to the livecache if the neighbor passed the
781 // listening test, else we silently drop neighbor endpoint
782 // since their listening port is misconfigured.
783 //
784 livecache_.insert(ep);
785 bootcache_.insert(ep.address);
786 }
787
788 slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage;
789 }
790
791 //--------------------------------------------------------------------------
792
793 void
794 remove(SlotImp::ptr const& slot)
795 {
796 {
797 auto const iter = slots_.find(slot->remote_endpoint());
798 // The slot must exist in the table
799 if (iter == slots_.end())
801 "PeerFinder::Logic::remove(): remote_endpoint "
802 "missing from slots_");
803
804 // Remove from slot by IP table
805 slots_.erase(iter);
806 }
807 // Remove the key if present
808 if (slot->public_key() != std::nullopt)
809 {
810 auto const iter = keys_.find(*slot->public_key());
811 // Key must exist
812 if (iter == keys_.end())
814 "PeerFinder::Logic::remove(): public_key missing "
815 "from keys_");
816
817 keys_.erase(iter);
818 }
819 // Remove from connected address table
820 {
821 auto const iter(connectedAddresses_.find(slot->remote_endpoint().address()));
822 // Address must exist
823 if (iter == connectedAddresses_.end())
825 "PeerFinder::Logic::remove(): remote_endpoint "
826 "address missing from connectedAddresses_");
827
829 }
830
831 // Update counts
832 counts_.remove(*slot);
833 }
834
835 void
837 {
839
840 remove(slot);
841
842 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
843 beast::Journal journal{sink};
844
845 // Mark fixed slot failure
846 if (slot->fixed() && !slot->inbound() && slot->state() != Slot::active)
847 {
848 auto iter(fixed_.find(slot->remote_endpoint()));
849 if (iter == fixed_.end())
851 "PeerFinder::Logic::on_closed(): remote_endpoint "
852 "missing from fixed_");
853
854 iter->second.failure(m_clock.now());
855 JLOG(journal.debug()) << "Logic fixed failed";
856 }
857
858 // Do state specific bookkeeping
859 switch (slot->state())
860 {
861 case Slot::accept:
862 JLOG(journal.trace()) << "Logic accept failed";
863 break;
864
865 case Slot::connect:
866 case Slot::connected:
867 bootcache_.on_failure(slot->remote_endpoint());
868 // VFALCO TODO If the address exists in the ephemeral/live
869 // endpoint livecache then we should mark the
870 // failure
871 // as if it didn't pass the listening test. We should also
872 // avoid propagating the address.
873 break;
874
875 case Slot::active:
876 JLOG(journal.trace()) << "Logic close";
877 break;
878
879 case Slot::closing:
880 JLOG(journal.trace()) << "Logic finished";
881 break;
882
883 // LCOV_EXCL_START
884 default:
885 UNREACHABLE(
886 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
887 "state");
888 break;
889 // LCOV_EXCL_STOP
890 }
891 }
892
893 void
895 {
897
898 bootcache_.on_failure(slot->remote_endpoint());
899 }
900
901 // Insert a set of redirect IP addresses into the Bootcache
902 template <class FwdIter>
903 void
904 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const& remote_address);
905
906 //--------------------------------------------------------------------------
907
908 // Returns `true` if the address matches a fixed slot address
909 // Must have the lock held
910 bool
911 fixed(beast::IP::Endpoint const& endpoint) const
912 {
913 for (auto const& entry : fixed_)
914 if (entry.first == endpoint)
915 return true;
916 return false;
917 }
918
919 // Returns `true` if the address matches a fixed slot address
920 // Note that this does not use the port information in the IP::Endpoint
921 // Must have the lock held
922 bool
923 fixed(beast::IP::Address const& address) const
924 {
925 for (auto const& entry : fixed_)
926 if (entry.first.address() == address)
927 return true;
928 return false;
929 }
930
931 //--------------------------------------------------------------------------
932 //
933 // Connection Strategy
934 //
935 //--------------------------------------------------------------------------
936
938 template <class Container>
939 void
940 get_fixed(std::size_t needed, Container& c, typename ConnectHandouts::Squelches& squelches)
941 {
942 auto const now(m_clock.now());
943 for (auto iter = fixed_.begin(); needed && iter != fixed_.end(); ++iter)
944 {
945 auto const& address(iter->first.address());
946 if (iter->second.when() <= now && squelches.find(address) == squelches.end() &&
947 std::none_of(slots_.cbegin(), slots_.cend(), [address](Slots::value_type const& v) {
948 return address == v.first.address();
949 }))
950 {
951 squelches.insert(iter->first.address());
952 c.push_back(iter->first);
953 --needed;
954 }
955 }
956 }
957
958 //--------------------------------------------------------------------------
959
960 void
962 {
963 fetch(source);
964 }
965
966 void
968 {
969 m_sources.push_back(source);
970 }
971
972 //--------------------------------------------------------------------------
973 //
974 // Bootcache livecache sources
975 //
976 //--------------------------------------------------------------------------
977
978 // Add a set of addresses.
979 // Returns the number of addresses added.
980 //
981 int
983 {
984 int count(0);
986 for (auto addr : list)
987 {
988 if (bootcache_.insertStatic(addr))
989 ++count;
990 }
991 return count;
992 }
993
994 // Fetch bootcache addresses from the specified source.
995 void
997 {
998 Source::Results results;
999
1000 {
1001 {
1003 if (stopping_)
1004 return;
1005 fetchSource_ = source;
1006 }
1007
1008 // VFALCO NOTE The fetch is synchronous,
1009 // not sure if that's a good thing.
1010 //
1011 source->fetch(results, m_journal);
1012
1013 {
1015 if (stopping_)
1016 return;
1017 fetchSource_ = nullptr;
1018 }
1019 }
1020
1021 if (!results.error)
1022 {
1023 int const count(addBootcacheAddresses(results.addresses));
1024 JLOG(m_journal.info()) << beast::leftw(18) << "Logic added " << count << " new "
1025 << ((count == 1) ? "address" : "addresses") << " from " << source->name();
1026 }
1027 else
1028 {
1029 JLOG(m_journal.error()) << beast::leftw(18) << "Logic failed "
1030 << "'" << source->name() << "' fetch, " << results.error.message();
1031 }
1032 }
1033
1034 //--------------------------------------------------------------------------
1035 //
1036 // Endpoint message handling
1037 //
1038 //--------------------------------------------------------------------------
1039
1040 // Returns true if the IP::Endpoint contains no invalid data.
1041 bool
1043 {
1044 if (is_unspecified(address))
1045 return false;
1046 if (!is_public(address))
1047 return false;
1048 if (address.port() == 0)
1049 return false;
1050 return true;
1051 }
1052
1053 //--------------------------------------------------------------------------
1054 //
1055 // PropertyStream
1056 //
1057 //--------------------------------------------------------------------------
1058
1059 void
1061 {
1062 for (auto const& entry : slots)
1063 {
1065 SlotImp const& slot(*entry.second);
1066 if (slot.local_endpoint() != std::nullopt)
1067 item["local_address"] = to_string(*slot.local_endpoint());
1068 item["remote_address"] = to_string(slot.remote_endpoint());
1069 if (slot.inbound())
1070 item["inbound"] = "yes";
1071 if (slot.fixed())
1072 item["fixed"] = "yes";
1073 if (slot.reserved())
1074 item["reserved"] = "yes";
1075
1076 item["state"] = stateString(slot.state());
1077 }
1078 }
1079
1080 void
1082 {
1084
1085 // VFALCO NOTE These ugly casts are needed because
1086 // of how std::size_t is declared on some linuxes
1087 //
1088 map["bootcache"] = std::uint32_t(bootcache_.size());
1089 map["fixed"] = std::uint32_t(fixed_.size());
1090
1091 {
1092 beast::PropertyStream::Set child("peers", map);
1093 writeSlots(child, slots_);
1094 }
1095
1096 {
1097 beast::PropertyStream::Map child("counts", map);
1098 counts_.onWrite(child);
1099 }
1100
1101 {
1102 beast::PropertyStream::Map child("config", map);
1103 config_.onWrite(child);
1104 }
1105
1106 {
1107 beast::PropertyStream::Map child("livecache", map);
1108 livecache_.onWrite(child);
1109 }
1110
1111 {
1112 beast::PropertyStream::Map child("bootcache", map);
1113 bootcache_.onWrite(child);
1114 }
1115 }
1116
1117 //--------------------------------------------------------------------------
1118 //
1119 // Diagnostics
1120 //
1121 //--------------------------------------------------------------------------
1122
1123 Counts const&
1124 counts() const
1125 {
1126 return counts_;
1127 }
1128
1129 static std::string
1131 {
1132 switch (state)
1133 {
1134 case Slot::accept:
1135 return "accept";
1136 case Slot::connect:
1137 return "connect";
1138 case Slot::connected:
1139 return "connected";
1140 case Slot::active:
1141 return "active";
1142 case Slot::closing:
1143 return "closing";
1144 default:
1145 break;
1146 };
1147 return "?";
1148 }
1149};
1150
1151//------------------------------------------------------------------------------
1152
1153template <class Checker>
1154template <class FwdIter>
1155void
1156Logic<Checker>::onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const& remote_address)
1157{
1158 std::lock_guard _(lock_);
1159 std::size_t n = 0;
1160 for (; first != last && n < Tuning::maxRedirects; ++first, ++n)
1161 bootcache_.insert(beast::IPAddressConversion::from_asio(*first));
1162 if (n > 0)
1163 {
1164 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic add " << n << " redirect IPs from " << remote_address;
1165 }
1166}
1167
1168} // namespace PeerFinder
1169} // namespace xrpl
T any_of(T... args)
T cbegin(T... args)
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:48
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:41
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:318
Stream debug() const
Definition Journal.h:300
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:269
Stream info() const
Definition Journal.h:306
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:14
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
Stores IP addresses useful for gaining initial connections.
Definition Bootcache.h:34
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
Definition Bootcache.cpp:85
void periodicActivity()
Stores the cache in the persistent database on a timer.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
Definition Bootcache.cpp:33
void load()
Load the persisted data from the Store into the container.
Definition Bootcache.cpp:66
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
Definition Bootcache.cpp:98
const_iterator end() const
Definition Bootcache.cpp:45
map_type::size_type size() const
Returns the number of entries in the cache.
Definition Bootcache.cpp:27
Tests remote listening sockets to make sure they are connectable.
Definition Checker.h:19
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Definition Checker.h:174
Receives handouts for making automatic connections.
Definition Handouts.h:246
bool try_insert(beast::IP::Endpoint const &endpoint)
Definition Handouts.h:306
Manages the count of available connections for the various slots.
Definition Counts.h:14
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Counts.h:208
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
Definition Counts.h:43
std::size_t attempts() const
Returns the number of outbound connection attempts.
Definition Counts.h:77
int out_max() const
Returns the total number of outbound slots.
Definition Counts.h:84
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
Definition Counts.h:50
int in_max() const
Returns the total number of inbound slots.
Definition Counts.h:146
std::size_t fixed_active() const
Returns the number of active fixed connections.
Definition Counts.h:107
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
Definition Counts.h:36
void onConfig(Config const &config)
Called when the config is set or changed.
Definition Counts.h:116
int out_active() const
Returns the number of outbound peers assigned an open slot.
Definition Counts.h:93
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
Definition Counts.h:68
void shuffle()
Shuffle each hop list.
Definition Livecache.h:443
The Livecache holds the short-lived relayed Endpoint messages.
Definition Livecache.h:170
class xrpl::PeerFinder::Livecache::hops_t hops
void expire()
Erase entries whose time has expired.
Definition Livecache.h:361
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
Definition Livecache.h:380
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Livecache.h:421
The Logic for maintaining the list of Slot addresses.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
void on_closed(SlotImp::ptr const &slot)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
static std::string stateString(Slot::State state)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
std::vector< std::shared_ptr< Source > > m_sources
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::multiset< beast::IP::Address > connectedAddresses_
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
std::map< beast::IP::Endpoint, Fixed > fixed_
void on_failure(SlotImp::ptr const &slot)
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
bool fixed(beast::IP::Address const &address) const
Receives handouts for redirecting a connection.
Definition Handouts.h:78
std::vector< Endpoint > & list()
Definition Handouts.h:100
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
Definition SlotImp.h:60
bool fixed() const override
Returns true if this is a fixed connection.
Definition SlotImp.h:36
void set_listening_port(std::uint16_t port)
Definition SlotImp.h:87
std::shared_ptr< SlotImp > ptr
Definition SlotImp.h:17
bool reserved() const override
Returns true if this is a reserved connection.
Definition SlotImp.h:42
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
Definition SlotImp.h:54
std::string prefix() const
Definition SlotImp.h:72
State state() const override
Returns the state of the connection.
Definition SlotImp.h:48
bool inbound() const override
Returns true if this is an inbound connection.
Definition SlotImp.h:30
Abstract persistence for PeerFinder data.
Definition Store.h:8
A public key.
Definition PublicKey.h:42
T count(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T forward_as_tuple(T... args)
T is_same_v
T make_pair(T... args)
T make_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
Definition IPAddressV6.h:10
boost::asio::ip::address Address
Definition IPAddress.h:19
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::chrono::seconds constexpr recentAttemptDuration(60)
std::chrono::seconds constexpr secondsPerMessage(151)
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Definition Handouts.h:46
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T ref(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
Definition iosformat.h:14
PeerFinder configuration settings.
int ipLimit
Limit how many incoming connections we allow per IP.
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
bool wantIncoming
true if we want to accept incoming connections.
bool autoConnect
true if we want to establish connections automatically
std::uint16_t listeningPort
The listening port number.
Describes a connectable peer address along with some metadata.
The results of a fetch.
Definition Source.h:23
boost::system::error_code error
Definition Source.h:27