rippled
Loading...
Searching...
No Matches
src/xrpld/peerfinder/detail/Logic.h
1#pragma once
2
3#include <xrpld/peerfinder/PeerfinderManager.h>
4#include <xrpld/peerfinder/detail/Bootcache.h>
5#include <xrpld/peerfinder/detail/Counts.h>
6#include <xrpld/peerfinder/detail/Fixed.h>
7#include <xrpld/peerfinder/detail/Handouts.h>
8#include <xrpld/peerfinder/detail/Livecache.h>
9#include <xrpld/peerfinder/detail/SlotImp.h>
10#include <xrpld/peerfinder/detail/Source.h>
11#include <xrpld/peerfinder/detail/Store.h>
12#include <xrpld/peerfinder/detail/iosformat.h>
13
14#include <xrpl/basics/Log.h>
15#include <xrpl/basics/contract.h>
16#include <xrpl/basics/random.h>
17#include <xrpl/beast/net/IPAddressConversion.h>
18#include <xrpl/beast/utility/WrappedSink.h>
19
20#include <algorithm>
21#include <functional>
22#include <map>
23#include <memory>
24#include <set>
25
26namespace xrpl {
27namespace PeerFinder {
28
33template <class Checker>
34class Logic
35{
36public:
37 // Maps remote endpoints to slots. Since a slot has a
38 // remote endpoint upon construction, this holds all counts.
39 //
41
46
48
49 // True if we are stopping.
50 bool stopping_ = false;
51
52 // The source we are currently fetching.
53 // This is used to cancel I/O during program exit.
55
56 // Configuration settings
58
59 // Slot counts and other aggregate statistics.
61
62 // A list of slots that should always be connected
64
65 // Live livecache from mtENDPOINTS messages
67
68 // LiveCache of addresses suitable for gaining initial connections
70
71 // Holds all counts
73
74 // The addresses (but not port) we are connected to. This includes
75 // outgoing connection attempts. Note that this set can contain
76 // duplicates (since the port is not set)
78
79 // Set of public keys belonging to active peers
81
82 // A list of dynamic sources to consult as a fallback
84
86
88
89 //--------------------------------------------------------------------------
90
91 Logic(clock_type& clock, Store& store, Checker& checker, beast::Journal journal)
92 : m_journal(journal)
93 , m_clock(clock)
94 , m_store(store)
95 , m_checker(checker)
96 , livecache_(m_clock, journal)
97 , bootcache_(store, m_clock, journal)
100 {
101 config({});
102 }
103
104 // Load persistent state information from the Store
105 //
106 void
108 {
109 std::lock_guard const _(lock_);
111 }
112
119 void
121 {
122 std::lock_guard const _(lock_);
123 stopping_ = true;
124 if (fetchSource_ != nullptr)
125 fetchSource_->cancel();
126 }
127
128 //--------------------------------------------------------------------------
129 //
130 // Manager
131 //
132 //--------------------------------------------------------------------------
133
134 void
135 config(Config const& c)
136 {
137 std::lock_guard const _(lock_);
138 config_ = c;
140 }
141
142 Config
144 {
145 std::lock_guard const _(lock_);
146 return config_;
147 }
148
149 void
154
155 void
157 {
158 std::lock_guard const _(lock_);
159
160 if (addresses.empty())
161 {
162 JLOG(m_journal.info()) << "Could not resolve fixed slot '" << name << "'";
163 return;
164 }
165
166 for (auto const& remote_address : addresses)
167 {
168 if (remote_address.port() == 0)
169 {
170 Throw<std::runtime_error>(
171 "Port not specified for address:" + remote_address.to_string());
172 }
173
174 auto result(fixed_.emplace(
176 std::forward_as_tuple(remote_address),
178
179 if (result.second)
180 {
181 JLOG(m_journal.debug())
182 << beast::leftw(18) << "Logic add fixed '" << name << "' at " << remote_address;
183 return;
184 }
185 }
186 }
187
188 //--------------------------------------------------------------------------
189
190 // Called when the Checker completes a connectivity test
191 void
193 beast::IP::Endpoint const& remoteAddress,
194 beast::IP::Endpoint const& checkedAddress,
195 boost::system::error_code ec)
196 {
197 if (ec == boost::asio::error::operation_aborted)
198 return;
199
200 std::lock_guard const _(lock_);
201 auto const iter(slots_.find(remoteAddress));
202 if (iter == slots_.end())
203 {
204 // The slot disconnected before we finished the check
205 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic tested " << checkedAddress
206 << " but the connection was closed";
207 return;
208 }
209
210 SlotImp& slot(*iter->second);
211 slot.checked = true;
212 slot.connectivityCheckInProgress = false;
213
214 beast::WrappedSink sink{m_journal.sink(), slot.prefix()};
215 beast::Journal const journal{sink};
216
217 if (ec)
218 {
219 // VFALCO TODO Should we retry depending on the error?
220 slot.canAccept = false;
221 JLOG(journal.error()) << "Logic testing " << iter->first << " with error, "
222 << ec.message();
223 bootcache_.on_failure(checkedAddress);
224 return;
225 }
226
227 slot.canAccept = true;
228 slot.set_listening_port(checkedAddress.port());
229 JLOG(journal.debug()) << "Logic testing " << checkedAddress << " succeeded";
230 }
231
232 //--------------------------------------------------------------------------
233
236 beast::IP::Endpoint const& local_endpoint,
237 beast::IP::Endpoint const& remote_endpoint)
238 {
239 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic accept" << remote_endpoint
240 << " on local " << local_endpoint;
241
242 std::lock_guard const _(lock_);
243
244 // Check for connection limit per address
245 if (is_public(remote_endpoint))
246 {
247 auto const count = connectedAddresses_.count(remote_endpoint.address());
248 if (count + 1 > config_.ipLimit)
249 {
250 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping inbound "
251 << remote_endpoint << " because of ip limits.";
252 return {SlotImp::ptr(), Result::ipLimitExceeded};
253 }
254 }
255
256 // Check for duplicate connection
257 if (slots_.find(remote_endpoint) != slots_.end())
258 {
259 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping " << remote_endpoint
260 << " as duplicate incoming";
261 return {SlotImp::ptr(), Result::duplicatePeer};
262 }
263
264 // Create the slot
265 SlotImp::ptr const slot(
267 local_endpoint, remote_endpoint, fixed(remote_endpoint.address()), m_clock));
268 // Add slot to table
269 auto const result(slots_.emplace(slot->remote_endpoint(), slot));
270 // Remote address must not already exist
271 XRPL_ASSERT(
272 result.second,
273 "xrpl::PeerFinder::Logic::new_inbound_slot : remote endpoint "
274 "inserted");
275 // Add to the connected address list
276 connectedAddresses_.emplace(remote_endpoint.address());
277
278 // Update counts
279 counts_.add(*slot);
280
281 return {result.first->second, Result::success};
282 }
283
284 // Can't check for self-connect because we don't know the local endpoint
287 {
288 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect " << remote_endpoint;
289
290 std::lock_guard const _(lock_);
291
292 // Check for duplicate connection
293 if (slots_.find(remote_endpoint) != slots_.end())
294 {
295 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic dropping " << remote_endpoint
296 << " as duplicate connect";
297 return {SlotImp::ptr(), Result::duplicatePeer};
298 }
299
300 // Create the slot
301 SlotImp::ptr const slot(
302 std::make_shared<SlotImp>(remote_endpoint, fixed(remote_endpoint), m_clock));
303
304 // Add slot to table
305 auto const result = slots_.emplace(slot->remote_endpoint(), slot);
306 // Remote address must not already exist
307 XRPL_ASSERT(
308 result.second,
309 "xrpl::PeerFinder::Logic::new_outbound_slot : remote endpoint "
310 "inserted");
311
312 // Add to the connected address list
313 connectedAddresses_.emplace(remote_endpoint.address());
314
315 // Update counts
316 counts_.add(*slot);
317
318 return {result.first->second, Result::success};
319 }
320
321 bool
322 onConnected(SlotImp::ptr const& slot, beast::IP::Endpoint const& local_endpoint)
323 {
324 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
325 beast::Journal const journal{sink};
326
327 JLOG(journal.trace()) << "Logic connected on local " << local_endpoint;
328
329 std::lock_guard const _(lock_);
330
331 // The object must exist in our table
332 XRPL_ASSERT(
333 slots_.find(slot->remote_endpoint()) != slots_.end(),
334 "xrpl::PeerFinder::Logic::onConnected : valid slot input");
335 // Assign the local endpoint now that it's known
336 slot->local_endpoint(local_endpoint);
337
338 // Check for self-connect by address
339 {
340 auto const iter(slots_.find(local_endpoint));
341 if (iter != slots_.end())
342 {
343 XRPL_ASSERT(
344 iter->second->local_endpoint() == slot->remote_endpoint(),
345 "xrpl::PeerFinder::Logic::onConnected : local and remote "
346 "endpoints do match");
347 JLOG(journal.warn()) << "Logic dropping as self connect";
348 return false;
349 }
350 }
351
352 // Update counts
353 counts_.remove(*slot);
354 slot->state(Slot::connected);
355 counts_.add(*slot);
356 return true;
357 }
358
359 Result
360 activate(SlotImp::ptr const& slot, PublicKey const& key, bool reserved)
361 {
362 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
363 beast::Journal const journal{sink};
364
365 JLOG(journal.debug()) << "Logic handshake " << slot->remote_endpoint() << " with "
366 << (reserved ? "reserved " : "") << "key " << key;
367
368 std::lock_guard const _(lock_);
369
370 // The object must exist in our table
371 XRPL_ASSERT(
372 slots_.find(slot->remote_endpoint()) != slots_.end(),
373 "xrpl::PeerFinder::Logic::activate : valid slot input");
374 // Must be accepted or connected
375 XRPL_ASSERT(
376 slot->state() == Slot::accept || slot->state() == Slot::connected,
377 "xrpl::PeerFinder::Logic::activate : valid slot state");
378
379 // Check for duplicate connection by key
380 if (keys_.find(key) != keys_.end())
381 return Result::duplicatePeer;
382
383 // If the peer belongs to a cluster or is reserved,
384 // update the slot to reflect that.
385 counts_.remove(*slot);
386 slot->reserved(reserved);
387 counts_.add(*slot);
388
389 // See if we have an open space for this slot
390 if (!counts_.can_activate(*slot))
391 {
392 if (!slot->inbound())
393 bootcache_.on_success(slot->remote_endpoint());
394 if (slot->inbound() && counts_.in_max() == 0)
395 return Result::inboundDisabled;
396 return Result::full;
397 }
398
399 // Set the key right before adding to the map, otherwise we might
400 // assert later when erasing the key.
401 slot->public_key(key);
402 {
403 [[maybe_unused]] bool const inserted = keys_.insert(key).second;
404 // Public key must not already exist
405 XRPL_ASSERT(inserted, "xrpl::PeerFinder::Logic::activate : public key inserted");
406 }
407
408 // Change state and update counts
409 counts_.remove(*slot);
410 slot->activate(m_clock.now());
411 counts_.add(*slot);
412
413 if (!slot->inbound())
414 bootcache_.on_success(slot->remote_endpoint());
415
416 // Mark fixed slot success
417 if (slot->fixed() && !slot->inbound())
418 {
419 auto iter(fixed_.find(slot->remote_endpoint()));
420 if (iter == fixed_.end())
422 "PeerFinder::Logic::activate(): remote_endpoint "
423 "missing from fixed_");
424
425 iter->second.success(m_clock.now());
426 JLOG(journal.trace()) << "Logic fixed success";
427 }
428
429 return Result::success;
430 }
431
438 {
439 std::lock_guard const _(lock_);
440 RedirectHandouts h(slot);
442 handout(&h, (&h) + 1, livecache_.hops.begin(), livecache_.hops.end());
443 return std::move(h.list());
444 }
445
449 // VFALCO TODO This should add the returned addresses to the
450 // squelch list in one go once the list is built,
451 // rather than having each module add to the squelch list.
454 {
456
457 std::lock_guard const _(lock_);
458
459 // Count how many more outbound attempts to make
460 //
461 auto needed(counts_.attempts_needed());
462 if (needed == 0)
463 return none;
464
465 ConnectHandouts h(needed, m_squelches);
466
467 // Make sure we don't connect to already-connected entries.
468 for (auto const& s : slots_)
469 {
470 auto const result(m_squelches.insert(s.second->remote_endpoint().address()));
471 if (!result.second)
472 m_squelches.touch(result.first);
473 }
474
475 // 1. Use Fixed if:
476 // Fixed active count is below fixed count AND
477 // ( There are eligible fixed addresses to try OR
478 // Any outbound attempts are in progress)
479 //
480 if (counts_.fixed_active() < fixed_.size())
481 {
482 get_fixed(needed, h.list(), m_squelches);
483
484 if (!h.list().empty())
485 {
486 JLOG(m_journal.debug())
487 << beast::leftw(18) << "Logic connect " << h.list().size() << " fixed";
488 return h.list();
489 }
490
491 if (counts_.attempts() > 0)
492 {
493 JLOG(m_journal.debug())
494 << beast::leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
495 return none;
496 }
497 }
498
499 // Only proceed if auto connect is enabled and we
500 // have less than the desired number of outbound slots
501 //
503 return none;
504
505 // 2. Use Livecache if:
506 // There are any entries in the cache OR
507 // Any outbound attempts are in progress
508 //
509 {
511 handout(&h, (&h) + 1, livecache_.hops.rbegin(), livecache_.hops.rend());
512 if (!h.list().empty())
513 {
514 JLOG(m_journal.debug())
515 << beast::leftw(18) << "Logic connect " << h.list().size() << " live "
516 << ((h.list().size() > 1) ? "endpoints" : "endpoint");
517 return h.list();
518 }
519 else if (counts_.attempts() > 0)
520 {
521 JLOG(m_journal.debug())
522 << beast::leftw(18) << "Logic waiting on " << counts_.attempts() << " attempts";
523 return none;
524 }
525 }
526
527 /* 3. Bootcache refill
528 If the Bootcache is empty, try to get addresses from the current
529 set of Sources and add them into the Bootstrap cache.
530
531 Pseudocode:
532 If ( domainNames.count() > 0 AND (
533 unusedBootstrapIPs.count() == 0
534 OR activeNameResolutions.count() > 0) )
535 ForOneOrMore (DomainName that hasn't been resolved recently)
536 Contact DomainName and add entries to the
537 unusedBootstrapIPs return;
538 */
539
540 // 4. Use Bootcache if:
541 // There are any entries we haven't tried lately
542 //
543 for (auto iter(bootcache_.begin()); !h.full() && iter != bootcache_.end(); ++iter)
544 h.try_insert(*iter);
545
546 if (!h.list().empty())
547 {
548 JLOG(m_journal.debug())
549 << beast::leftw(18) << "Logic connect " << h.list().size() << " boot "
550 << ((h.list().size() > 1) ? "addresses" : "address");
551 return h.list();
552 }
553
554 // If we get here we are stuck
555 return none;
556 }
557
560 {
562
563 std::lock_guard const _(lock_);
564
565 clock_type::time_point const now = m_clock.now();
566 if (m_whenBroadcast <= now)
567 {
569
570 {
571 // build list of active slots
573 slots.reserve(slots_.size());
575 slots_.cbegin(), slots_.cend(), [&slots](Slots::value_type const& value) {
576 if (value.second->state() == Slot::active)
577 slots.emplace_back(value.second);
578 });
579 std::shuffle(slots.begin(), slots.end(), default_prng());
580
581 // build target vector
582 targets.reserve(slots.size());
583 std::for_each(slots.cbegin(), slots.cend(), [&targets](SlotImp::ptr const& slot) {
584 targets.emplace_back(slot);
585 });
586 }
587
588 /* VFALCO NOTE
589 This is a temporary measure. Once we know our own IP
590 address, the correct solution is to put it into the Livecache
591 at hops 0, and go through the regular handout path. This way
592 we avoid handing our address out too frequently, which this code
593 suffers from.
594 */
595 // Add an entry for ourselves if:
596 // 1. We want incoming
597 // 2. We have slots
598 // 3. We haven't failed the firewalled test
599 //
600 if (config_.wantIncoming && counts_.in_max() > 0)
601 {
602 Endpoint ep;
603 ep.hops = 0;
604 // we use the unspecified (0) address here because the value is
605 // irrelevant to recipients. When peers receive an endpoint
606 // with 0 hops, they use the socket remote_addr instead of the
607 // value in the message. Furthermore, since the address value
608 // is ignored, the type/version (ipv4 vs ipv6) doesn't matter
609 // either. ipv6 has a slightly more compact string
610 // representation of 0, so use that for self entries.
611 ep.address =
613 for (auto& t : targets)
614 t.insert(ep);
615 }
616
617 // build sequence of endpoints by hops
619 handout(targets.begin(), targets.end(), livecache_.hops.begin(), livecache_.hops.end());
620
621 // broadcast
622 for (auto const& t : targets)
623 {
624 SlotImp::ptr const& slot = t.slot();
625 auto const& list = t.list();
626 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
627 beast::Journal const journal{sink};
628 JLOG(journal.trace()) << "Logic sending " << list.size()
629 << ((list.size() == 1) ? " endpoint" : " endpoints");
630 result.push_back(std::make_pair(slot, list));
631 }
632
634 }
635
636 return result;
637 }
638
639 void
641 {
642 std::lock_guard const _(lock_);
643
644 // Expire the Livecache
646
647 // Expire the recent cache in each slot
648 for (auto const& entry : slots_)
649 entry.second->expire();
650
651 // Expire the recent attempts table
653
655 }
656
657 //--------------------------------------------------------------------------
658
659 // Validate and clean up the list that we received from the slot.
660 void
662 {
663 bool neighbor(false);
664 for (auto iter = list.begin(); iter != list.end();)
665 {
666 Endpoint& ep(*iter);
667
668 // Enforce hop limit
669 if (ep.hops > Tuning::maxHops)
670 {
671 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop " << ep.address
672 << " for excess hops " << ep.hops;
673 iter = list.erase(iter);
674 continue;
675 }
676
677 // See if we are directly connected
678 if (ep.hops == 0)
679 {
680 if (!neighbor)
681 {
682 // Fill in our neighbors remote address
683 neighbor = true;
684 ep.address = slot->remote_endpoint().at_port(ep.address.port());
685 }
686 else
687 {
688 JLOG(m_journal.debug())
689 << beast::leftw(18) << "Endpoints drop " << ep.address << " for extra self";
690 iter = list.erase(iter);
691 continue;
692 }
693 }
694
695 // Discard invalid addresses
696 if (!is_valid_address(ep.address))
697 {
698 JLOG(m_journal.debug())
699 << beast::leftw(18) << "Endpoints drop " << ep.address << " as invalid";
700 iter = list.erase(iter);
701 continue;
702 }
703
704 // Filter duplicates
705 if (std::any_of(list.begin(), iter, [ep](Endpoints::value_type const& other) {
706 return ep.address == other.address;
707 }))
708 {
709 JLOG(m_journal.debug())
710 << beast::leftw(18) << "Endpoints drop " << ep.address << " as duplicate";
711 iter = list.erase(iter);
712 continue;
713 }
714
715 // Increment hop count on the incoming message, so
716 // we store it at the hop count we will send it at.
717 //
718 ++ep.hops;
719
720 ++iter;
721 }
722 }
723
724 void
726 {
727 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
728 beast::Journal const journal{sink};
729
730 // If we're sent too many endpoints, sample them at random:
732 {
733 std::shuffle(list.begin(), list.end(), default_prng());
735 }
736
737 JLOG(journal.trace()) << "Endpoints contained " << list.size()
738 << ((list.size() > 1) ? " entries" : " entry");
739
740 std::lock_guard const _(lock_);
741
742 // The object must exist in our table
743 XRPL_ASSERT(
744 slots_.find(slot->remote_endpoint()) != slots_.end(),
745 "xrpl::PeerFinder::Logic::on_endpoints : valid slot input");
746
747 // Must be handshaked!
748 XRPL_ASSERT(
749 slot->state() == Slot::active,
750 "xrpl::PeerFinder::Logic::on_endpoints : valid slot state");
751
753
754 // Limit how often we accept new endpoints
755 if (slot->whenAcceptEndpoints > now)
756 return;
757
758 preprocess(slot, list);
759
760 for (auto const& ep : list)
761 {
762 XRPL_ASSERT(ep.hops, "xrpl::PeerFinder::Logic::on_endpoints : nonzero hops");
763
764 slot->recent.insert(ep.address, ep.hops);
765
766 // Note hops has been incremented, so 1
767 // means a directly connected neighbor.
768 //
769 if (ep.hops == 1)
770 {
771 if (slot->connectivityCheckInProgress)
772 {
773 JLOG(journal.debug())
774 << "Logic testing " << ep.address << " already in progress";
775 continue;
776 }
777
778 if (!slot->checked)
779 {
780 // Mark that a check for this slot is now in progress.
781 slot->connectivityCheckInProgress = true;
782
783 // Test the slot's listening port before
784 // adding it to the livecache for the first time.
785 //
787 ep.address,
788 std::bind(
790 this,
791 slot->remote_endpoint(),
792 ep.address,
793 std::placeholders::_1));
794
795 // Note that we simply discard the first Endpoint
796 // that the neighbor sends when we perform the
797 // listening test. They will just send us another
798 // one in a few seconds.
799
800 continue;
801 }
802
803 // If they failed the test then skip the address
804 if (!slot->canAccept)
805 continue;
806 }
807
808 // We only add to the livecache if the neighbor passed the
809 // listening test, else we silently drop neighbor endpoint
810 // since their listening port is misconfigured.
811 //
812 livecache_.insert(ep);
813 bootcache_.insert(ep.address);
814 }
815
816 slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage;
817 }
818
819 //--------------------------------------------------------------------------
820
821 void
822 remove(SlotImp::ptr const& slot)
823 {
824 {
825 auto const iter = slots_.find(slot->remote_endpoint());
826 // The slot must exist in the table
827 if (iter == slots_.end())
829 "PeerFinder::Logic::remove(): remote_endpoint "
830 "missing from slots_");
831
832 // Remove from slot by IP table
833 slots_.erase(iter);
834 }
835 // Remove the key if present
836 if (slot->public_key() != std::nullopt)
837 {
838 auto const iter = keys_.find(*slot->public_key());
839 // Key must exist
840 if (iter == keys_.end())
842 "PeerFinder::Logic::remove(): public_key missing "
843 "from keys_");
844
845 keys_.erase(iter);
846 }
847 // Remove from connected address table
848 {
849 auto const iter(connectedAddresses_.find(slot->remote_endpoint().address()));
850 // Address must exist
851 if (iter == connectedAddresses_.end())
853 "PeerFinder::Logic::remove(): remote_endpoint "
854 "address missing from connectedAddresses_");
855
857 }
858
859 // Update counts
860 counts_.remove(*slot);
861 }
862
863 void
865 {
866 std::lock_guard const _(lock_);
867
868 remove(slot);
869
870 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
871 beast::Journal const journal{sink};
872
873 // Mark fixed slot failure
874 if (slot->fixed() && !slot->inbound() && slot->state() != Slot::active)
875 {
876 auto iter(fixed_.find(slot->remote_endpoint()));
877 if (iter == fixed_.end())
879 "PeerFinder::Logic::on_closed(): remote_endpoint "
880 "missing from fixed_");
881
882 iter->second.failure(m_clock.now());
883 JLOG(journal.debug()) << "Logic fixed failed";
884 }
885
886 // Do state specific bookkeeping
887 switch (slot->state())
888 {
889 case Slot::accept:
890 JLOG(journal.trace()) << "Logic accept failed";
891 break;
892
893 case Slot::connect:
894 case Slot::connected:
895 bootcache_.on_failure(slot->remote_endpoint());
896 // VFALCO TODO If the address exists in the ephemeral/live
897 // endpoint livecache then we should mark the
898 // failure
899 // as if it didn't pass the listening test. We should also
900 // avoid propagating the address.
901 break;
902
903 case Slot::active:
904 JLOG(journal.trace()) << "Logic close";
905 break;
906
907 case Slot::closing:
908 JLOG(journal.trace()) << "Logic finished";
909 break;
910
911 // LCOV_EXCL_START
912 default:
913 UNREACHABLE(
914 "xrpl::PeerFinder::Logic::on_closed : invalid slot "
915 "state");
916 break;
917 // LCOV_EXCL_STOP
918 }
919 }
920
921 void
923 {
924 std::lock_guard const _(lock_);
925
926 bootcache_.on_failure(slot->remote_endpoint());
927 }
928
929 // Insert a set of redirect IP addresses into the Bootcache
930 template <class FwdIter>
931 void
932 onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const& remote_address);
933
934 //--------------------------------------------------------------------------
935
936 // Returns `true` if the address matches a fixed slot address
937 // Must have the lock held
938 bool
939 fixed(beast::IP::Endpoint const& endpoint) const
940 {
941 for (auto const& entry : fixed_)
942 if (entry.first == endpoint)
943 return true;
944 return false;
945 }
946
947 // Returns `true` if the address matches a fixed slot address
948 // Note that this does not use the port information in the IP::Endpoint
949 // Must have the lock held
950 bool
951 fixed(beast::IP::Address const& address) const
952 {
953 for (auto const& entry : fixed_)
954 if (entry.first.address() == address)
955 return true;
956 return false;
957 }
958
959 //--------------------------------------------------------------------------
960 //
961 // Connection Strategy
962 //
963 //--------------------------------------------------------------------------
964
966 template <class Container>
967 void
968 get_fixed(std::size_t needed, Container& c, typename ConnectHandouts::Squelches& squelches)
969 {
970 auto const now(m_clock.now());
971 for (auto iter = fixed_.begin(); needed && iter != fixed_.end(); ++iter)
972 {
973 auto const& address(iter->first.address());
974 if (iter->second.when() <= now && squelches.find(address) == squelches.end() &&
975 std::none_of(slots_.cbegin(), slots_.cend(), [address](Slots::value_type const& v) {
976 return address == v.first.address();
977 }))
978 {
979 squelches.insert(iter->first.address());
980 c.push_back(iter->first);
981 --needed;
982 }
983 }
984 }
985
986 //--------------------------------------------------------------------------
987
988 void
990 {
991 fetch(source);
992 }
993
994 void
996 {
997 m_sources.push_back(source);
998 }
999
1000 //--------------------------------------------------------------------------
1001 //
1002 // Bootcache livecache sources
1003 //
1004 //--------------------------------------------------------------------------
1005
1006 // Add a set of addresses.
1007 // Returns the number of addresses added.
1008 //
1009 int
1011 {
1012 int count(0);
1013 std::lock_guard const _(lock_);
1014 for (auto const& addr : list)
1015 {
1016 if (bootcache_.insertStatic(addr))
1017 ++count;
1018 }
1019 return count;
1020 }
1021
1022 // Fetch bootcache addresses from the specified source.
1023 void
1025 {
1026 Source::Results results;
1027
1028 {
1029 {
1030 std::lock_guard const _(lock_);
1031 if (stopping_)
1032 return;
1033 fetchSource_ = source;
1034 }
1035
1036 // VFALCO NOTE The fetch is synchronous,
1037 // not sure if that's a good thing.
1038 //
1039 source->fetch(results, m_journal);
1040
1041 {
1042 std::lock_guard const _(lock_);
1043 if (stopping_)
1044 return;
1045 fetchSource_ = nullptr;
1046 }
1047 }
1048
1049 if (!results.error)
1050 {
1051 int const count(addBootcacheAddresses(results.addresses));
1052 JLOG(m_journal.info())
1053 << beast::leftw(18) << "Logic added " << count << " new "
1054 << ((count == 1) ? "address" : "addresses") << " from " << source->name();
1055 }
1056 else
1057 {
1058 JLOG(m_journal.error())
1059 << beast::leftw(18) << "Logic failed "
1060 << "'" << source->name() << "' fetch, " << results.error.message();
1061 }
1062 }
1063
1064 //--------------------------------------------------------------------------
1065 //
1066 // Endpoint message handling
1067 //
1068 //--------------------------------------------------------------------------
1069
1070 // Returns true if the IP::Endpoint contains no invalid data.
1071 bool
1073 {
1074 if (is_unspecified(address))
1075 return false;
1076 if (!is_public(address))
1077 return false;
1078 if (address.port() == 0)
1079 return false;
1080 return true;
1081 }
1082
1083 //--------------------------------------------------------------------------
1084 //
1085 // PropertyStream
1086 //
1087 //--------------------------------------------------------------------------
1088
1089 void
1091 {
1092 for (auto const& entry : slots)
1093 {
1095 SlotImp const& slot(*entry.second);
1096 if (slot.local_endpoint() != std::nullopt)
1097 item["local_address"] = to_string(*slot.local_endpoint());
1098 item["remote_address"] = to_string(slot.remote_endpoint());
1099 if (slot.inbound())
1100 item["inbound"] = "yes";
1101 if (slot.fixed())
1102 item["fixed"] = "yes";
1103 if (slot.reserved())
1104 item["reserved"] = "yes";
1105
1106 item["state"] = stateString(slot.state());
1107 }
1108 }
1109
1110 void
1112 {
1113 std::lock_guard const _(lock_);
1114
1115 // VFALCO NOTE These ugly casts are needed because
1116 // of how std::size_t is declared on some linuxes
1117 //
1118 map["bootcache"] = std::uint32_t(bootcache_.size());
1119 map["fixed"] = std::uint32_t(fixed_.size());
1120
1121 {
1122 beast::PropertyStream::Set child("peers", map);
1123 writeSlots(child, slots_);
1124 }
1125
1126 {
1127 beast::PropertyStream::Map child("counts", map);
1128 counts_.onWrite(child);
1129 }
1130
1131 {
1132 beast::PropertyStream::Map child("config", map);
1133 config_.onWrite(child);
1134 }
1135
1136 {
1137 beast::PropertyStream::Map child("livecache", map);
1138 livecache_.onWrite(child);
1139 }
1140
1141 {
1142 beast::PropertyStream::Map child("bootcache", map);
1143 bootcache_.onWrite(child);
1144 }
1145 }
1146
1147 //--------------------------------------------------------------------------
1148 //
1149 // Diagnostics
1150 //
1151 //--------------------------------------------------------------------------
1152
1153 Counts const&
1154 counts() const
1155 {
1156 return counts_;
1157 }
1158
1159 static std::string
1161 {
1162 switch (state)
1163 {
1164 case Slot::accept:
1165 return "accept";
1166 case Slot::connect:
1167 return "connect";
1168 case Slot::connected:
1169 return "connected";
1170 case Slot::active:
1171 return "active";
1172 case Slot::closing:
1173 return "closing";
1174 default:
1175 break;
1176 };
1177 return "?";
1178 }
1179};
1180
1181//------------------------------------------------------------------------------
1182
1183template <class Checker>
1184template <class FwdIter>
1185void
1187 FwdIter first,
1188 FwdIter last,
1189 boost::asio::ip::tcp::endpoint const& remote_address)
1190{
1191 std::lock_guard const _(lock_);
1192 std::size_t n = 0;
1193 for (; first != last && n < Tuning::maxRedirects; ++first, ++n)
1194 bootcache_.insert(beast::IPAddressConversion::from_asio(*first));
1195 if (n > 0)
1196 {
1197 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic add " << n << " redirect IPs from "
1198 << remote_address;
1199 }
1200}
1201
1202} // namespace PeerFinder
1203} // namespace xrpl
T any_of(T... args)
T cbegin(T... args)
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:48
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:41
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:270
Stream info() const
Definition Journal.h:307
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:14
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
Stores IP addresses useful for gaining initial connections.
Definition Bootcache.h:34
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
Definition Bootcache.cpp:89
void periodicActivity()
Stores the cache in the persistent database on a timer.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
Definition Bootcache.cpp:36
void load()
Load the persisted data from the Store into the container.
Definition Bootcache.cpp:69
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
const_iterator end() const
Definition Bootcache.cpp:48
map_type::size_type size() const
Returns the number of entries in the cache.
Definition Bootcache.cpp:30
Tests remote listening sockets to make sure they are connectable.
Definition Checker.h:19
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Definition Checker.h:178
Receives handouts for making automatic connections.
Definition Handouts.h:246
bool try_insert(beast::IP::Endpoint const &endpoint)
Definition Handouts.h:307
Manages the count of available connections for the various slots.
Definition Counts.h:14
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Counts.h:190
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
Definition Counts.h:25
std::size_t attempts() const
Returns the number of outbound connection attempts.
Definition Counts.h:59
int out_max() const
Returns the total number of outbound slots.
Definition Counts.h:66
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
Definition Counts.h:32
int in_max() const
Returns the total number of inbound slots.
Definition Counts.h:128
std::size_t fixed_active() const
Returns the number of active fixed connections.
Definition Counts.h:89
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
Definition Counts.h:18
void onConfig(Config const &config)
Called when the config is set or changed.
Definition Counts.h:98
int out_active() const
Returns the number of outbound peers assigned an open slot.
Definition Counts.h:75
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
Definition Counts.h:50
void shuffle()
Shuffle each hop list.
Definition Livecache.h:462
The Livecache holds the short-lived relayed Endpoint messages.
Definition Livecache.h:172
class xrpl::PeerFinder::Livecache::hops_t hops
void expire()
Erase entries whose time has expired.
Definition Livecache.h:370
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
Definition Livecache.h:392
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Livecache.h:439
The Logic for maintaining the list of Slot addresses.
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
void on_closed(SlotImp::ptr const &slot)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
static std::string stateString(Slot::State state)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
void fetch(std::shared_ptr< Source > const &source)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
std::vector< std::shared_ptr< Source > > m_sources
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void remove(SlotImp::ptr const &slot)
void addStaticSource(std::shared_ptr< Source > const &source)
std::multiset< beast::IP::Address > connectedAddresses_
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
std::map< beast::IP::Endpoint, Fixed > fixed_
void on_failure(SlotImp::ptr const &slot)
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
bool fixed(beast::IP::Address const &address) const
Receives handouts for redirecting a connection.
Definition Handouts.h:78
std::vector< Endpoint > & list()
Definition Handouts.h:100
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
Definition SlotImp.h:60
bool fixed() const override
Returns true if this is a fixed connection.
Definition SlotImp.h:36
void set_listening_port(std::uint16_t port)
Definition SlotImp.h:87
std::shared_ptr< SlotImp > ptr
Definition SlotImp.h:17
bool reserved() const override
Returns true if this is a reserved connection.
Definition SlotImp.h:42
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
Definition SlotImp.h:54
std::string prefix() const
Definition SlotImp.h:72
State state() const override
Returns the state of the connection.
Definition SlotImp.h:48
bool inbound() const override
Returns true if this is an inbound connection.
Definition SlotImp.h:30
Abstract persistence for PeerFinder data.
Definition Store.h:8
A public key.
Definition PublicKey.h:42
T count(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T forward_as_tuple(T... args)
T is_same_v
T make_pair(T... args)
T make_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
Definition IPAddressV6.h:10
boost::asio::ip::address Address
Definition IPAddress.h:19
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::chrono::seconds constexpr recentAttemptDuration(60)
std::chrono::seconds constexpr secondsPerMessage(151)
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Definition Handouts.h:46
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T ref(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
Definition iosformat.h:14
PeerFinder configuration settings.
int ipLimit
Limit how many incoming connections we allow per IP.
void onWrite(beast::PropertyStream::Map &map) const
Write the configuration into a property stream.
bool wantIncoming
true if we want to accept incoming connections.
bool autoConnect
true if we want to establish connections automatically
std::uint16_t listeningPort
The listening port number.
Describes a connectable peer address along with some metadata.
The results of a fetch.
Definition Source.h:23
boost::system::error_code error
Definition Source.h:27