xrpld
Loading...
Searching...
No Matches
src/xrpld/peerfinder/detail/Logic.h
1#pragma once
2
3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
13
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
19
20#include <algorithm>
21#include <functional>
22#include <map>
23#include <memory>
24#include <set>
25
26namespace xrpl::PeerFinder {
27
32template <class Checker>
33class Logic
34{
35public:
36 // Maps remote endpoints to slots. Since a slot has a
37 // remote endpoint upon construction, this holds all counts_.
38 //
40
45
47
48 // True if we are stopping.
49 bool stopping = false;
50
51 // The source we are currently fetching.
52 // This is used to cancel I/O during program exit.
54
55private:
56 // Configuration settings
58
59 // Slot counts and other aggregate statistics.
61
62 // A list of slots that should always be connected
64
65public:
66 // Live livecache from mtENDPOINTS messages
68
69 // LiveCache of addresses suitable for gaining initial connections
71
72 // Holds all counts
74
75 // The addresses (but not port) we are connected to. This includes
76 // outgoing connection attempts. Note that this set can contain
77 // duplicates (since the port is not set)
79
80 // Set of public keys belonging to active peers
82
83 // A list of dynamic sources to consult as a fallback
85
87
89
90 //--------------------------------------------------------------------------
91public:
104
105 // Load persistent state information from the Store
106 //
107 void
109 {
110 std::scoped_lock const _(lock);
111 bootcache.load();
112 }
113
120 void
122 {
123 std::scoped_lock const _(lock);
124 stopping = true;
125 if (fetchSource != nullptr)
126 fetchSource->cancel();
127 }
128
129 //--------------------------------------------------------------------------
130 //
131 // Manager
132 //
133 //--------------------------------------------------------------------------
134
135 void
136 config(Config const& c)
137 {
138 std::scoped_lock const _(lock);
139 config_ = c;
140 counts_.onConfig(config_);
141 }
142
143 Config
145 {
146 std::scoped_lock const _(lock);
147 return config_;
148 }
149
150 void
155
156 void
158 {
159 std::scoped_lock const _(lock);
160
161 if (addresses.empty())
162 {
163 JLOG(journal.info()) << "Could not resolve fixed slot '" << name << "'";
164 return;
165 }
166
167 for (auto const& remoteAddress : addresses)
168 {
169 if (remoteAddress.port() == 0)
170 {
172 "Port not specified for address:" + remoteAddress.toString());
173 }
174
175 auto result(fixed_.emplace(
177 std::forward_as_tuple(remoteAddress),
179
180 if (result.second)
181 {
182 JLOG(journal.debug())
183 << beast::Leftw(18) << "Logic add fixed '" << name << "' at " << remoteAddress;
184 return;
185 }
186 }
187 }
188
189 //--------------------------------------------------------------------------
190
191 // Called when the Checker completes a connectivity test
192 void
194 beast::IP::Endpoint const& remoteAddress,
195 beast::IP::Endpoint const& checkedAddress,
196 boost::system::error_code ec)
197 {
198 if (ec == boost::asio::error::operation_aborted)
199 return;
200
201 std::scoped_lock const _(lock);
202 auto const iter(slots.find(remoteAddress));
203 if (iter == slots.end())
204 {
205 // The slot disconnected before we finished the check
206 JLOG(journal.debug()) << beast::Leftw(18) << "Logic tested " << checkedAddress
207 << " but the connection was closed";
208 return;
209 }
210
211 SlotImp& slot(*iter->second);
212 slot.checked = true;
213 slot.connectivityCheckInProgress = false;
214
215 beast::WrappedSink sink{journal.sink(), slot.prefix()};
216 beast::Journal const journal{sink};
217
218 if (ec)
219 {
220 // VFALCO TODO Should we retry depending on the error?
221 slot.canAccept = false;
222 JLOG(journal.error()) << "Logic testing " << iter->first << " with error, "
223 << ec.message();
224 bootcache.onFailure(checkedAddress);
225 return;
226 }
227
228 slot.canAccept = true;
229 slot.setListeningPort(checkedAddress.port());
230 JLOG(journal.debug()) << "Logic testing " << checkedAddress << " succeeded";
231 }
232
233 //--------------------------------------------------------------------------
234
237 beast::IP::Endpoint const& localEndpoint,
238 beast::IP::Endpoint const& remoteEndpoint)
239 {
240 JLOG(journal.debug()) << beast::Leftw(18) << "Logic accept" << remoteEndpoint
241 << " on local " << localEndpoint;
242
243 std::scoped_lock const _(lock);
244
245 // Check for connection limit per address
246 if (isPublic(remoteEndpoint))
247 {
248 auto const count = connectedAddresses.count(remoteEndpoint.address());
249 if (count + 1 > config_.ipLimit)
250 {
251 JLOG(journal.debug()) << beast::Leftw(18) << "Logic dropping inbound "
252 << remoteEndpoint << " because of ip limits.";
253 return {SlotImp::ptr(), Result::IpLimitExceeded};
254 }
255 }
256
257 // Check for duplicate connection
258 if (slots.contains(remoteEndpoint))
259 {
260 JLOG(journal.debug()) << beast::Leftw(18) << "Logic dropping " << remoteEndpoint
261 << " as duplicate incoming";
262 return {SlotImp::ptr(), Result::DuplicatePeer};
263 }
264
265 // Create the slot
266 SlotImp::ptr const slot(
268 localEndpoint, remoteEndpoint, fixed(remoteEndpoint.address()), clock));
269 // Add slot to table
270 auto const result(slots.emplace(slot->remoteEndpoint(), slot));
271 // Remote address must not already exist
272 XRPL_ASSERT(
273 result.second,
274 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
275 "inserted");
276 // Add to the connected address list
277 connectedAddresses.emplace(remoteEndpoint.address());
278
279 // Update counts
280 counts_.add(*slot);
281
282 return {result.first->second, Result::Success};
283 }
284
285 // Can't check for self-connect because we don't know the local endpoint
288 {
289 JLOG(journal.debug()) << beast::Leftw(18) << "Logic connect " << remoteEndpoint;
290
291 std::scoped_lock const _(lock);
292
293 // Check for duplicate connection
294 if (slots.contains(remoteEndpoint))
295 {
296 JLOG(journal.debug()) << beast::Leftw(18) << "Logic dropping " << remoteEndpoint
297 << " as duplicate connect";
298 return {SlotImp::ptr(), Result::DuplicatePeer};
299 }
300
301 // Create the slot
302 SlotImp::ptr const slot(
303 std::make_shared<SlotImp>(remoteEndpoint, fixed(remoteEndpoint), clock));
304
305 // Add slot to table
306 auto const result = slots.emplace(slot->remoteEndpoint(), slot);
307 // Remote address must not already exist
308 XRPL_ASSERT(
309 result.second,
310 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
311 "inserted");
312
313 // Add to the connected address list
314 connectedAddresses.emplace(remoteEndpoint.address());
315
316 // Update counts
317 counts_.add(*slot);
318
319 return {result.first->second, Result::Success};
320 }
321
322 bool
323 onConnected(SlotImp::ptr const& slot, beast::IP::Endpoint const& localEndpoint)
324 {
325 beast::WrappedSink sink{journal.sink(), slot->prefix()};
326 beast::Journal const journal{sink};
327
328 JLOG(journal.trace()) << "Logic connected on local " << localEndpoint;
329
330 std::scoped_lock const _(lock);
331
332 // The object must exist in our table
333 XRPL_ASSERT(
334 slots.contains(slot->remoteEndpoint()),
335 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
336 // Assign the local endpoint now that it's known
337 slot->localEndpoint(localEndpoint);
338
339 // Check for self-connect by address
340 {
341 auto const iter(slots.find(localEndpoint));
342 if (iter != slots.end())
343 {
344 XRPL_ASSERT(
345 iter->second->localEndpoint() == slot->remoteEndpoint(),
346 "xrpl::PeerFinder::Logic::onConnected : local and remote "
347 "endpoints do match");
348 JLOG(journal.warn()) << "Logic dropping as self connect";
349 return false;
350 }
351 }
352
353 // Update counts
354 counts_.remove(*slot);
355 slot->state(Slot::State::Connected);
356 counts_.add(*slot);
357 return true;
358 }
359
360 Result
361 activate(SlotImp::ptr const& slot, PublicKey const& key, bool reserved)
362 {
363 beast::WrappedSink sink{journal.sink(), slot->prefix()};
364 beast::Journal const journal{sink};
365
366 JLOG(journal.debug()) << "Logic handshake " << slot->remoteEndpoint() << " with "
367 << (reserved ? "reserved " : "") << "key " << key;
368
369 std::scoped_lock const _(lock);
370
371 // The object must exist in our table
372 XRPL_ASSERT(
373 slots.contains(slot->remoteEndpoint()),
374 "xrpl::PeerFinder::Logic::activate : valid slot input");
375 // Must be accepted or connected
376 XRPL_ASSERT(
377 slot->state() == Slot::State::Accept || slot->state() == Slot::State::Connected,
378 "xrpl::PeerFinder::Logic::activate : valid slot state");
379
380 // Check for duplicate connection by key
381 if (keys.contains(key))
382 return Result::DuplicatePeer;
383
384 // If the peer belongs to a cluster or is reserved,
385 // update the slot to reflect that.
386 counts_.remove(*slot);
387 slot->reserved(reserved);
388 counts_.add(*slot);
389
390 // See if we have an open space for this slot
391 if (!counts_.canActivate(*slot))
392 {
393 if (!slot->inbound())
394 bootcache.onSuccess(slot->remoteEndpoint());
395 if (slot->inbound() && counts_.inMax() == 0)
396 return Result::InboundDisabled;
397 return Result::Full;
398 }
399
400 // Set the key right before adding to the map, otherwise we might
401 // assert later when erasing the key.
402 slot->publicKey(key);
403 {
404 [[maybe_unused]] bool const inserted = keys.insert(key).second;
405 // Public key must not already exist
406 XRPL_ASSERT(inserted, "xrpl::PeerFinder::Logic::activate : public key inserted");
407 }
408
409 // Change state and update counts
410 counts_.remove(*slot);
411 slot->activate(clock.now());
412 counts_.add(*slot);
413
414 if (!slot->inbound())
415 bootcache.onSuccess(slot->remoteEndpoint());
416
417 // Mark fixed slot success
418 if (slot->fixed() && !slot->inbound())
419 {
420 auto iter(fixed_.find(slot->remoteEndpoint()));
421 if (iter == fixed_.end())
422 {
424 "PeerFinder::Logic::activate(): remote_endpoint "
425 "missing from fixed_");
426 }
427
428 iter->second.success(clock.now());
429 JLOG(journal.trace()) << "Logic fixed success";
430 }
431
432 return Result::Success;
433 }
434
441 {
442 std::scoped_lock const _(lock);
443 RedirectHandouts h(slot);
444 livecache.hops.shuffle();
445 handout(&h, (&h) + 1, livecache.hops.begin(), livecache.hops.end());
446 return std::move(h.list());
447 }
448
452 // VFALCO TODO This should add the returned addresses to the
453 // squelch list in one go once the list is built,
454 // rather than having each module add to the squelch list.
457 {
459
460 std::scoped_lock const _(lock);
461
462 // Count how many more outbound attempts to make
463 //
464 auto needed(counts_.attemptsNeeded());
465 if (needed == 0)
466 return none;
467
468 ConnectHandouts h(needed, squelches);
469
470 // Make sure we don't connect to already-connected entries.
471 for (auto const& s : slots)
472 {
473 auto const result(squelches.insert(s.second->remoteEndpoint().address()));
474 if (!result.second)
475 squelches.touch(result.first);
476 }
477
478 // 1. Use Fixed if:
479 // Fixed active count is below fixed count AND
480 // ( There are eligible fixed addresses to try OR
481 // Any outbound attempts are in progress)
482 //
483 if (counts_.fixedActive() < fixed_.size())
484 {
485 getFixed(needed, h.list(), squelches);
486
487 if (!h.list().empty())
488 {
489 JLOG(journal.debug())
490 << beast::Leftw(18) << "Logic connect " << h.list().size() << " fixed";
491 return h.list();
492 }
493
494 if (counts_.attempts() > 0)
495 {
496 JLOG(journal.debug())
497 << beast::Leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
498 return none;
499 }
500 }
501
502 // Only proceed if auto connect is enabled and we
503 // have less than the desired number of outbound slots
504 //
505 if (!config_.autoConnect || counts_.outActive() >= counts_.outMax())
506 return none;
507
508 // 2. Use Livecache if:
509 // There are any entries in the cache OR
510 // Any outbound attempts are in progress
511 //
512 {
513 livecache.hops.shuffle();
514 handout(&h, (&h) + 1, livecache.hops.rbegin(), livecache.hops.rend());
515 if (!h.list().empty())
516 {
517 JLOG(journal.debug())
518 << beast::Leftw(18) << "Logic connect " << h.list().size() << " live "
519 << ((h.list().size() > 1) ? "endpoints" : "endpoint");
520 return h.list();
521 }
522 if (counts_.attempts() > 0)
523 {
524 JLOG(journal.debug())
525 << beast::Leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
526 return none;
527 }
528 }
529
530 /* 3. Bootcache refill
531 If the Bootcache is empty, try to get addresses from the current
532 set of Sources and add them into the Bootstrap cache.
533
534 Pseudocode:
535 If ( domainNames.count() > 0 AND (
536 unusedBootstrapIPs.count() == 0
537 OR activeNameResolutions.count() > 0) )
538 ForOneOrMore (DomainName that hasn't been resolved recently)
539 Contact DomainName and add entries to the
540 unusedBootstrapIPs return;
541 */
542
543 // 4. Use Bootcache if:
544 // There are any entries we haven't tried lately
545 //
546 for (auto iter(bootcache.begin()); !h.full() && iter != bootcache.end(); ++iter)
547 h.tryInsert(*iter);
548
549 if (!h.list().empty())
550 {
551 JLOG(journal.debug()) << beast::Leftw(18) << "Logic connect " << h.list().size()
552 << " boot " << ((h.list().size() > 1) ? "addresses" : "address");
553 return h.list();
554 }
555
556 // If we get here we are stuck
557 return none;
558 }
559
562 {
564
565 std::scoped_lock const _(lock);
566
567 clock_type::time_point const now = clock.now();
568 if (whenBroadcast <= now)
569 {
571
572 {
573 // build list of active slots
574 std::vector<SlotImp::ptr> activeSlots;
575 activeSlots.reserve(slots.size());
576 std::ranges::for_each(slots, [&activeSlots](Slots::value_type const& value) {
577 if (value.second->state() == Slot::State::Active)
578 activeSlots.emplace_back(value.second);
579 });
580 std::shuffle(activeSlots.begin(), activeSlots.end(), defaultPrng());
581
582 // build target vector
583 targets.reserve(activeSlots.size());
584 std::ranges::for_each(activeSlots, [&targets](SlotImp::ptr const& slot) {
585 targets.emplace_back(slot);
586 });
587 }
588
589 /* VFALCO NOTE
590 This is a temporary measure. Once we know our own IP
591 address, the correct solution is to put it into the Livecache
592 at hops 0, and go through the regular handout path. This way
593 we avoid handing our address out too frequently, which this code
594 suffers from.
595 */
596 // Add an entry for ourselves if:
597 // 1. We want incoming
598 // 2. We have slots
599 // 3. We haven't failed the firewalled test
600 //
601 if (config_.wantIncoming && counts_.inMax() > 0)
602 {
603 Endpoint ep;
604 ep.hops = 0;
605 // we use the unspecified (0) address here because the value is
606 // irrelevant to recipients. When peers receive an endpoint
607 // with 0 hops, they use the socket remote_addr instead of the
608 // value in the message. Furthermore, since the address value
609 // is ignored, the type/version (ipv4 vs ipv6) doesn't matter
610 // either. ipv6 has a slightly more compact string
611 // representation of 0, so use that for self entries.
612 ep.address =
614 for (auto& t : targets)
615 t.insert(ep);
616 }
617
618 // build sequence of endpoints by hops
619 livecache.hops.shuffle();
620 handout(targets.begin(), targets.end(), livecache.hops.begin(), livecache.hops.end());
621
622 // broadcast
623 for (auto const& t : targets)
624 {
625 SlotImp::ptr const& slot = t.slot();
626 auto const& list = t.list();
627 beast::WrappedSink sink{journal.sink(), slot->prefix()};
628 beast::Journal const journal{sink};
629 JLOG(journal.trace()) << "Logic sending " << list.size()
630 << ((list.size() == 1) ? " endpoint" : " endpoints");
631 result.emplace_back(slot, list);
632 }
633
635 }
636
637 return result;
638 }
639
640 void
642 {
643 std::scoped_lock const _(lock);
644
645 // Expire the Livecache
646 livecache.expire();
647
648 // Expire the recent cache in each slot
649 for (auto const& entry : slots)
650 entry.second->expire();
651
652 // Expire the recent attempts table
654
655 bootcache.periodicActivity();
656 }
657
658 //--------------------------------------------------------------------------
659
660 // Validate and clean up the list that we received from the slot.
661 void
663 {
664 bool neighbor(false);
665 for (auto iter = list.begin(); iter != list.end();)
666 {
667 Endpoint& ep(*iter);
668
669 // Enforce hop limit
670 if (ep.hops > Tuning::kMaxHops)
671 {
672 JLOG(journal.debug()) << beast::Leftw(18) << "Endpoints drop " << ep.address
673 << " for excess hops " << ep.hops;
674 iter = list.erase(iter);
675 continue;
676 }
677
678 // See if we are directly connected
679 if (ep.hops == 0)
680 {
681 if (!neighbor)
682 {
683 // Fill in our neighbors remote address
684 neighbor = true;
685 ep.address = slot->remoteEndpoint().atPort(ep.address.port());
686 }
687 else
688 {
689 JLOG(journal.debug())
690 << beast::Leftw(18) << "Endpoints drop " << ep.address << " for extra self";
691 iter = list.erase(iter);
692 continue;
693 }
694 }
695
696 // Discard invalid addresses
697 if (config_.verifyEndpoints && !isValidAddress(ep.address))
698 {
699 JLOG(journal.debug())
700 << beast::Leftw(18) << "Endpoints drop " << ep.address << " as invalid";
701 iter = list.erase(iter);
702 continue;
703 }
704
705 // Filter duplicates
706 if (std::any_of(list.begin(), iter, [ep](Endpoints::value_type const& other) {
707 return ep.address == other.address;
708 }))
709 {
710 JLOG(journal.debug())
711 << beast::Leftw(18) << "Endpoints drop " << ep.address << " as duplicate";
712 iter = list.erase(iter);
713 continue;
714 }
715
716 // Increment hop count on the incoming message, so
717 // we store it at the hop count we will send it at.
718 //
719 ++ep.hops;
720
721 ++iter;
722 }
723 }
724
725 void
727 {
728 beast::WrappedSink sink{journal.sink(), slot->prefix()};
729 beast::Journal const journal{sink};
730
731 // If we're sent too many endpoints, sample them at random:
733 {
734 std::shuffle(list.begin(), list.end(), defaultPrng());
736 }
737
738 JLOG(journal.trace()) << "Endpoints contained " << list.size()
739 << ((list.size() > 1) ? " entries" : " entry");
740
741 std::scoped_lock const _(lock);
742
743 // The object must exist in our table
744 XRPL_ASSERT(
745 slots.contains(slot->remoteEndpoint()),
746 "xrpl::PeerFinder::Logic::onEndpoints : valid slot input");
747
748 // Must be handshaked!
749 XRPL_ASSERT(
750 slot->state() == Slot::State::Active,
751 "xrpl::PeerFinder::Logic::onEndpoints : valid slot state");
752
753 clock_type::time_point const now(clock.now());
754
755 // Limit how often we accept new endpoints
756 if (slot->whenAcceptEndpoints > now)
757 return;
758
759 preprocess(slot, list);
760
761 for (auto const& ep : list)
762 {
763 XRPL_ASSERT(ep.hops, "xrpl::PeerFinder::Logic::onEndpoints : nonzero hops");
764
765 slot->recent.insert(ep.address, ep.hops);
766
767 // Note hops has been incremented, so 1
768 // means a directly connected neighbor.
769 //
770 if (ep.hops == 1)
771 {
772 if (slot->connectivityCheckInProgress)
773 {
774 JLOG(journal.debug())
775 << "Logic testing " << ep.address << " already in progress";
776 continue;
777 }
778
779 if (!slot->checked)
780 {
781 // Mark that a check for this slot is now in progress.
782 slot->connectivityCheckInProgress = true;
783
784 // Test the slot's listening port before
785 // adding it to the livecache for the first time.
786 //
787 checker.asyncConnect(
788 ep.address,
789 std::bind(
791 this,
792 slot->remoteEndpoint(),
793 ep.address,
794 std::placeholders::_1));
795
796 // Note that we simply discard the first Endpoint
797 // that the neighbor sends when we perform the
798 // listening test. They will just send us another
799 // one in a few seconds.
800
801 continue;
802 }
803
804 // If they failed the test then skip the address
805 if (!slot->canAccept)
806 continue;
807 }
808
809 // We only add to the livecache if the neighbor passed the
810 // listening test, else we silently drop neighbor endpoint
811 // since their listening port is misconfigured.
812 //
813 livecache.insert(ep);
814 bootcache.insert(ep.address);
815 }
816
817 slot->whenAcceptEndpoints = now + Tuning::kSecondsPerMessage;
818 }
819
820 //--------------------------------------------------------------------------
821
822 void
823 remove(SlotImp::ptr const& slot)
824 {
825 {
826 auto const iter = slots.find(slot->remoteEndpoint());
827 // The slot must exist in the table
828 if (iter == slots.end())
829 {
831 "PeerFinder::Logic::remove(): remote_endpoint "
832 "missing from slots_");
833 }
834
835 // Remove from slot by IP table
836 slots.erase(iter);
837 }
838 // Remove the key if present
839 if (slot->publicKey() != std::nullopt)
840 {
841 auto const iter = keys.find(*slot->publicKey());
842 // Key must exist
843 if (iter == keys.end())
844 {
846 "PeerFinder::Logic::remove(): public_key missing "
847 "from keys_");
848 }
849
850 keys.erase(iter);
851 }
852 // Remove from connected address table
853 {
854 auto const iter(connectedAddresses.find(slot->remoteEndpoint().address()));
855 // Address must exist
856 if (iter == connectedAddresses.end())
857 {
859 "PeerFinder::Logic::remove(): remote_endpoint "
860 "address missing from connectedAddresses_");
861 }
862
863 connectedAddresses.erase(iter);
864 }
865
866 // Update counts
867 counts_.remove(*slot);
868 }
869
870 void
872 {
873 std::scoped_lock const _(lock);
874
875 remove(slot);
876
877 beast::WrappedSink sink{journal.sink(), slot->prefix()};
878 beast::Journal const journal{sink};
879
880 // Mark fixed slot failure
881 if (slot->fixed() && !slot->inbound() && slot->state() != Slot::State::Active)
882 {
883 auto iter(fixed_.find(slot->remoteEndpoint()));
884 if (iter == fixed_.end())
885 {
887 "PeerFinder::Logic::on_closed(): remote_endpoint "
888 "missing from fixed_");
889 }
890
891 iter->second.failure(clock.now());
892 JLOG(journal.debug()) << "Logic fixed failed";
893 }
894
895 // Do state specific bookkeeping
896 switch (slot->state())
897 {
899 JLOG(journal.trace()) << "Logic accept failed";
900 break;
901
904 bootcache.onFailure(slot->remoteEndpoint());
905 // VFALCO TODO If the address exists in the ephemeral/live
906 // endpoint livecache then we should mark the
907 // failure
908 // as if it didn't pass the listening test. We should also
909 // avoid propagating the address.
910 break;
911
913 JLOG(journal.trace()) << "Logic close";
914 break;
915
917 JLOG(journal.trace()) << "Logic finished";
918 break;
919
920 // LCOV_EXCL_START
921 default:
922 UNREACHABLE(
923 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
924 "state");
925 break;
926 // LCOV_EXCL_STOP
927 }
928 }
929
930 void
932 {
933 std::scoped_lock const _(lock);
934
935 bootcache.onFailure(slot->remoteEndpoint());
936 }
937
938 // Insert a set of redirect IP addresses into the Bootcache
939 template <class FwdIter>
940 void
941 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const& remoteAddress);
942
943 //--------------------------------------------------------------------------
944
945 // Returns `true` if the address matches a fixed slot address
946 // Must have the lock held
947 bool
948 fixed(beast::IP::Endpoint const& endpoint) const
949 {
950 for (auto const& entry : fixed_)
951 {
952 if (entry.first == endpoint)
953 return true;
954 }
955 return false;
956 }
957
958 // Returns `true` if the address matches a fixed slot address
959 // Note that this does not use the port information in the IP::Endpoint
960 // Must have the lock held
961 bool
962 fixed(beast::IP::Address const& address) const
963 {
964 for (auto const& entry : fixed_)
965 {
966 if (entry.first.address() == address)
967 return true;
968 }
969 return false;
970 }
971
972 //--------------------------------------------------------------------------
973 //
974 // Connection Strategy
975 //
976 //--------------------------------------------------------------------------
977
979 template <class Container>
980 void
982 {
983 auto const now(clock.now());
984 for (auto iter = fixed_.begin(); needed && iter != fixed_.end(); ++iter)
985 {
986 auto const& address(iter->first.address());
987 if (iter->second.when() <= now && squelches.find(address) == squelches.end() &&
988 std::ranges::none_of(slots, [address](Slots::value_type const& v) {
989 return address == v.first.address();
990 }))
991 {
992 squelches.insert(iter->first.address());
993 c.push_back(iter->first);
994 --needed;
995 }
996 }
997 }
998
999 //--------------------------------------------------------------------------
1000
1001 void
1003 {
1004 fetch(source);
1005 }
1006
1007 void
1009 {
1010 sources.push_back(source);
1011 }
1012
1013 //--------------------------------------------------------------------------
1014 //
1015 // Bootcache livecache sources
1016 //
1017 //--------------------------------------------------------------------------
1018
1019 // Add a set of addresses.
1020 // Returns the number of addresses added.
1021 //
1022 int
1024 {
1025 int count(0);
1026 std::scoped_lock const _(lock);
1027 for (auto const& addr : list)
1028 {
1029 if (bootcache.insertStatic(addr))
1030 ++count;
1031 }
1032 return count;
1033 }
1034
1035 // Fetch bootcache addresses from the specified source.
1036 void
1038 {
1039 Source::Results results;
1040
1041 {
1042 {
1043 std::scoped_lock const _(lock);
1044 if (stopping)
1045 return;
1046 fetchSource = source;
1047 }
1048
1049 // VFALCO NOTE The fetch is synchronous,
1050 // not sure if that's a good thing.
1051 //
1052 source->fetch(results, journal);
1053
1054 {
1055 std::scoped_lock const _(lock);
1056 if (stopping)
1057 return;
1058 fetchSource = nullptr;
1059 }
1060 }
1061
1062 if (!results.error)
1063 {
1064 int const count(addBootcacheAddresses(results.addresses));
1065 JLOG(journal.info()) << beast::Leftw(18) << "Logic added " << count << " new "
1066 << ((count == 1) ? "address" : "addresses") << " from "
1067 << source->name();
1068 }
1069 else
1070 {
1071 JLOG(journal.error()) << beast::Leftw(18) << "Logic failed "
1072 << "'" << source->name() << "' fetch, "
1073 << results.error.message();
1074 }
1075 }
1076
1077 //--------------------------------------------------------------------------
1078 //
1079 // Endpoint message handling
1080 //
1081 //--------------------------------------------------------------------------
1082
1083 // Returns true if the IP::Endpoint contains no invalid data.
1084 bool
1086 {
1087 if (isUnspecified(address))
1088 return false;
1089 if (isLoopback(address))
1090 return false;
1091 if (!isPublic(address))
1092 return false;
1093 if (address.port() == 0)
1094 return false;
1095 return true;
1096 }
1097
1098 //--------------------------------------------------------------------------
1099 //
1100 // PropertyStream
1101 //
1102 //--------------------------------------------------------------------------
1103
1104 void
1106 {
1107 for (auto const& entry : slots)
1108 {
1110 SlotImp const& slot(*entry.second);
1111 if (slot.localEndpoint() != std::nullopt)
1112 item["local_address"] = to_string(*slot.localEndpoint());
1113 item["remote_address"] = to_string(slot.remoteEndpoint());
1114 if (slot.inbound())
1115 item["inbound"] = "yes";
1116 if (slot.fixed())
1117 item["fixed"] = "yes";
1118 if (slot.reserved())
1119 item["reserved"] = "yes";
1120
1121 item["state"] = stateString(slot.state());
1122 }
1123 }
1124
1125 void
1127 {
1128 std::scoped_lock const _(lock);
1129
1130 // VFALCO NOTE These ugly casts are needed because
1131 // of how std::size_t is declared on some linuxes
1132 //
1133 map["bootcache"] = std::uint32_t(bootcache.size());
1134 map["fixed"] = std::uint32_t(fixed_.size());
1135
1136 {
1137 beast::PropertyStream::Set child("peers", map);
1138 writeSlots(child, slots);
1139 }
1140
1141 {
1142 beast::PropertyStream::Map child("counts", map);
1143 counts_.onWrite(child);
1144 }
1145
1146 {
1147 beast::PropertyStream::Map child("config", map);
1148 config_.onWrite(child);
1149 }
1150
1151 {
1152 beast::PropertyStream::Map child("livecache", map);
1153 livecache.onWrite(child);
1154 }
1155
1156 {
1157 beast::PropertyStream::Map child("bootcache", map);
1158 bootcache.onWrite(child);
1159 }
1160 }
1161
1162 //--------------------------------------------------------------------------
1163 //
1164 // Diagnostics
1165 //
1166 //--------------------------------------------------------------------------
1167
1168 Counts const&
1169 counts() const
1170 {
1171 return counts_;
1172 }
1173
1174 static std::string
1176 {
1177 switch (state)
1178 {
1180 return "accept";
1182 return "connect";
1184 return "connected";
1186 return "active";
1188 return "closing";
1189 default:
1190 break;
1191 };
1192 return "?";
1193 }
1194};
1195
1196//------------------------------------------------------------------------------
1197
1198template <class Checker>
1199template <class FwdIter>
1200void
1202 FwdIter first,
1203 FwdIter last,
1204 boost::asio::ip::tcp::endpoint const& remoteAddress)
1205{
1206 std::scoped_lock const _(lock);
1207 std::size_t n = 0;
1208 for (; first != last && n < Tuning::kMaxRedirects; ++first, ++n)
1210 if (n > 0)
1211 {
1212 JLOG(journal.trace()) << beast::Leftw(18) << "Logic add " << n << " redirect IPs from "
1213 << remoteAddress;
1214 }
1215}
1216
1217} // namespace xrpl::PeerFinder
T any_of(T... args)
T begin(T... args)
T bind(T... args)
std::chrono::steady_clock::time_point time_point
A version-independent IP address and port combination.
Definition IPEndpoint.h:17
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:54
Endpoint atPort(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:47
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:40
A generic endpoint for log messages.
Definition Journal.h:38
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:16
Stores IP addresses useful for gaining initial connections.
Definition Bootcache.h:33
Tests remote listening sockets to make sure they are connectable.
Definition Checker.h:18
Receives handouts for making automatic connections.
Definition Handouts.h:247
bool tryInsert(beast::IP::Endpoint const &endpoint)
Definition Handouts.h:308
beast::aged_set< beast::IP::Address > Squelches
Definition Handouts.h:251
Manages the count of available connections for the various slots.
Definition Counts.h:16
The Livecache holds the short-lived relayed Endpoint messages.
Definition Livecache.h:172
std::vector< std::shared_ptr< Source > > sources
void addFixedPeer(std::string_view name, std::vector< beast::IP::Endpoint > const &addresses)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remoteAddress)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string_view name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
bool isValidAddress(beast::IP::Endpoint const &address)
void getFixed(std::size_t needed, Container &c, ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
std::pair< SlotImp::ptr, Result > newOutboundSlot(beast::IP::Endpoint const &remoteEndpoint)
std::pair< SlotImp::ptr, Result > newInboundSlot(beast::IP::Endpoint const &localEndpoint, beast::IP::Endpoint const &remoteEndpoint)
static std::string stateString(Slot::State state)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
std::multiset< beast::IP::Address > connectedAddresses
void addSource(std::shared_ptr< Source > const &source)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
std::map< beast::IP::Endpoint, std::shared_ptr< SlotImp > > Slots
void onEndpoints(SlotImp::ptr const &slot, Endpoints list)
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::map< beast::IP::Endpoint, Fixed > fixed_
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onClosed(SlotImp::ptr const &slot)
void onWrite(beast::PropertyStream::Map &map)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &localEndpoint)
bool fixed(beast::IP::Address const &address) const
void onFailure(SlotImp::ptr const &slot)
Receives handouts for redirecting a connection.
Definition Handouts.h:79
std::vector< Endpoint > & list()
Definition Handouts.h:101
bool fixed() const override
Returns true if this is a fixed connection.
Definition SlotImp.h:35
std::shared_ptr< SlotImp > ptr
Definition SlotImp.h:16
bool reserved() const override
Returns true if this is a reserved connection.
Definition SlotImp.h:41
std::string prefix() const
Definition SlotImp.h:71
void setListeningPort(std::uint16_t port)
Definition SlotImp.h:86
std::optional< beast::IP::Endpoint > const & localEndpoint() const override
The local endpoint of the socket, when known.
Definition SlotImp.h:59
State state() const override
Returns the state of the connection.
Definition SlotImp.h:47
beast::IP::Endpoint const & remoteEndpoint() const override
The remote endpoint of socket.
Definition SlotImp.h:53
bool inbound() const override
Returns true if this is an inbound connection.
Definition SlotImp.h:29
Abstract persistence for PeerFinder data.
Definition Store.h:7
A public key.
Definition PublicKey.h:42
T emplace_back(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T for_each(T... args)
T forward_as_tuple(T... args)
T make_shared(T... args)
T make_tuple(T... args)
boost::asio::ip::address Address
Definition IPAddress.h:19
boost::asio::ip::address_v6 AddressV6
Definition IPAddressV6.h:9
std::enable_if_t< IsAgedContainer< AgedContainer >::value, std::size_t > expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
static constexpr auto kMaxRedirects
Max redirects we will accept from one connection.
constexpr std::chrono::seconds kRecentAttemptDuration(60)
constexpr std::chrono::seconds kSecondsPerMessage(151)
std::vector< Endpoint > Endpoints
A set of Endpoint used for connecting.
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seqFirst, SeqFwdIter seqLast)
Distributes objects to targets according to business rules.
Definition Handouts.h:47
std::vector< beast::IP::Endpoint > IPAddresses
Represents a set of addresses.
Result
Possible results from activating a slot.
beast::AbstractClock< std::chrono::steady_clock > clock_type
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void logicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & defaultPrng()
Return the default random engine.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T piecewise_construct
T shuffle(T... args)
T ref(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
static IP::Endpoint fromAsio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
Definition iosformat.h:14
PeerFinder configuration settings.
Describes a connectable peer address along with some metadata.
The results of a fetch.
Definition Source.h:22
boost::system::error_code error
Definition Source.h:26