rippled
Loading...
Searching...
No Matches
src/xrpld/peerfinder/detail/Logic.h
1#ifndef XRPL_PEERFINDER_LOGIC_H_INCLUDED
2#define XRPL_PEERFINDER_LOGIC_H_INCLUDED
3
4#include <xrpld/peerfinder/PeerfinderManager.h>
5#include <xrpld/peerfinder/detail/Bootcache.h>
6#include <xrpld/peerfinder/detail/Counts.h>
7#include <xrpld/peerfinder/detail/Fixed.h>
8#include <xrpld/peerfinder/detail/Handouts.h>
9#include <xrpld/peerfinder/detail/Livecache.h>
10#include <xrpld/peerfinder/detail/SlotImp.h>
11#include <xrpld/peerfinder/detail/Source.h>
12#include <xrpld/peerfinder/detail/Store.h>
13#include <xrpld/peerfinder/detail/iosformat.h>
14
15#include <xrpl/basics/Log.h>
16#include <xrpl/basics/contract.h>
17#include <xrpl/basics/random.h>
18#include <xrpl/beast/net/IPAddressConversion.h>
19#include <xrpl/beast/utility/WrappedSink.h>
20
21#include <algorithm>
22#include <functional>
23#include <map>
24#include <memory>
25#include <set>
26
27namespace ripple {
28namespace PeerFinder {
29
34template <class Checker>
35class Logic
36{
37public:
38 // Maps remote endpoints to slots. Since a slot has a
39 // remote endpoint upon construction, this holds all counts.
40 //
42
47
49
50 // True if we are stopping.
51 bool stopping_ = false;
52
53 // The source we are currently fetching.
54 // This is used to cancel I/O during program exit.
56
57 // Configuration settings
59
60 // Slot counts and other aggregate statistics.
62
63 // A list of slots that should always be connected
65
66 // Live livecache from mtENDPOINTS messages
68
69 // LiveCache of addresses suitable for gaining initial connections
71
72 // Holds all counts
74
75 // The addresses (but not port) we are connected to. This includes
76 // outgoing connection attempts. Note that this set can contain
77 // duplicates (since the port is not set)
79
80 // Set of public keys belonging to active peers
82
83 // A list of dynamic sources to consult as a fallback
85
87
89
90 //--------------------------------------------------------------------------
91
93 clock_type& clock,
94 Store& store,
95 Checker& checker,
96 beast::Journal journal)
97 : m_journal(journal)
98 , m_clock(clock)
99 , m_store(store)
100 , m_checker(checker)
101 , livecache_(m_clock, journal)
102 , bootcache_(store, m_clock, journal)
103 , m_whenBroadcast(m_clock.now())
105 {
106 config({});
107 }
108
109 // Load persistent state information from the Store
110 //
111 void
113 {
116 }
117
124 void
126 {
128 stopping_ = true;
129 if (fetchSource_ != nullptr)
130 fetchSource_->cancel();
131 }
132
133 //--------------------------------------------------------------------------
134 //
135 // Manager
136 //
137 //--------------------------------------------------------------------------
138
139 void
140 config(Config const& c)
141 {
143 config_ = c;
145 }
146
147 Config
149 {
151 return config_;
152 }
153
154 void
159
160 void
162 std::string const& name,
163 std::vector<beast::IP::Endpoint> const& addresses)
164 {
166
167 if (addresses.empty())
168 {
169 JLOG(m_journal.info())
170 << "Could not resolve fixed slot '" << name << "'";
171 return;
172 }
173
174 for (auto const& remote_address : addresses)
175 {
176 if (remote_address.port() == 0)
177 {
178 Throw<std::runtime_error>(
179 "Port not specified for address:" +
180 remote_address.to_string());
181 }
182
183 auto result(fixed_.emplace(
185 std::forward_as_tuple(remote_address),
187
188 if (result.second)
189 {
190 JLOG(m_journal.debug())
191 << beast::leftw(18) << "Logic add fixed '" << name
192 << "' at " << remote_address;
193 return;
194 }
195 }
196 }
197
198 //--------------------------------------------------------------------------
199
200 // Called when the Checker completes a connectivity test
201 void
203 beast::IP::Endpoint const& remoteAddress,
204 beast::IP::Endpoint const& checkedAddress,
205 boost::system::error_code ec)
206 {
207 if (ec == boost::asio::error::operation_aborted)
208 return;
209
211 auto const iter(slots_.find(remoteAddress));
212 if (iter == slots_.end())
213 {
214 // The slot disconnected before we finished the check
215 JLOG(m_journal.debug())
216 << beast::leftw(18) << "Logic tested " << checkedAddress
217 << " but the connection was closed";
218 return;
219 }
220
221 SlotImp& slot(*iter->second);
222 slot.checked = true;
223 slot.connectivityCheckInProgress = false;
224
225 beast::WrappedSink sink{m_journal.sink(), slot.prefix()};
226 beast::Journal journal{sink};
227
228 if (ec)
229 {
230 // VFALCO TODO Should we retry depending on the error?
231 slot.canAccept = false;
232 JLOG(journal.error()) << "Logic testing " << iter->first
233 << " with error, " << ec.message();
234 bootcache_.on_failure(checkedAddress);
235 return;
236 }
237
238 slot.canAccept = true;
239 slot.set_listening_port(checkedAddress.port());
240 JLOG(journal.debug())
241 << "Logic testing " << checkedAddress << " succeeded";
242 }
243
244 //--------------------------------------------------------------------------
245
248 beast::IP::Endpoint const& local_endpoint,
249 beast::IP::Endpoint const& remote_endpoint)
250 {
251 JLOG(m_journal.debug())
252 << beast::leftw(18) << "Logic accept" << remote_endpoint
253 << " on local " << local_endpoint;
254
256
257 // Check for connection limit per address
258 if (is_public(remote_endpoint))
259 {
260 auto const count =
261 connectedAddresses_.count(remote_endpoint.address());
262 if (count + 1 > config_.ipLimit)
263 {
264 JLOG(m_journal.debug())
265 << beast::leftw(18) << "Logic dropping inbound "
266 << remote_endpoint << " because of ip limits.";
267 return {SlotImp::ptr(), Result::ipLimitExceeded};
268 }
269 }
270
271 // Check for duplicate connection
272 if (slots_.find(remote_endpoint) != slots_.end())
273 {
274 JLOG(m_journal.debug())
275 << beast::leftw(18) << "Logic dropping " << remote_endpoint
276 << " as duplicate incoming";
277 return {SlotImp::ptr(), Result::duplicatePeer};
278 }
279
280 // Create the slot
282 local_endpoint,
283 remote_endpoint,
284 fixed(remote_endpoint.address()),
285 m_clock));
286 // Add slot to table
287 auto const result(slots_.emplace(slot->remote_endpoint(), slot));
288 // Remote address must not already exist
289 XRPL_ASSERT(
290 result.second,
291 "ripple::PeerFinder::Logic::new_inbound_slot : remote endpoint "
292 "inserted");
293 // Add to the connected address list
294 connectedAddresses_.emplace(remote_endpoint.address());
295
296 // Update counts
297 counts_.add(*slot);
298
299 return {result.first->second, Result::success};
300 }
301
302 // Can't check for self-connect because we don't know the local endpoint
305 {
306 JLOG(m_journal.debug())
307 << beast::leftw(18) << "Logic connect " << remote_endpoint;
308
310
311 // Check for duplicate connection
312 if (slots_.find(remote_endpoint) != slots_.end())
313 {
314 JLOG(m_journal.debug())
315 << beast::leftw(18) << "Logic dropping " << remote_endpoint
316 << " as duplicate connect";
317 return {SlotImp::ptr(), Result::duplicatePeer};
318 }
319
320 // Create the slot
322 remote_endpoint, fixed(remote_endpoint), m_clock));
323
324 // Add slot to table
325 auto const result = slots_.emplace(slot->remote_endpoint(), slot);
326 // Remote address must not already exist
327 XRPL_ASSERT(
328 result.second,
329 "ripple::PeerFinder::Logic::new_outbound_slot : remote endpoint "
330 "inserted");
331
332 // Add to the connected address list
333 connectedAddresses_.emplace(remote_endpoint.address());
334
335 // Update counts
336 counts_.add(*slot);
337
338 return {result.first->second, Result::success};
339 }
340
341 bool
343 SlotImp::ptr const& slot,
344 beast::IP::Endpoint const& local_endpoint)
345 {
346 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
347 beast::Journal journal{sink};
348
349 JLOG(journal.trace()) << "Logic connected on local " << local_endpoint;
350
352
353 // The object must exist in our table
354 XRPL_ASSERT(
355 slots_.find(slot->remote_endpoint()) != slots_.end(),
356 "ripple::PeerFinder::Logic::onConnected : valid slot input");
357 // Assign the local endpoint now that it's known
358 slot->local_endpoint(local_endpoint);
359
360 // Check for self-connect by address
361 {
362 auto const iter(slots_.find(local_endpoint));
363 if (iter != slots_.end())
364 {
365 XRPL_ASSERT(
366 iter->second->local_endpoint() == slot->remote_endpoint(),
367 "ripple::PeerFinder::Logic::onConnected : local and remote "
368 "endpoints do match");
369 JLOG(journal.warn()) << "Logic dropping as self connect";
370 return false;
371 }
372 }
373
374 // Update counts
375 counts_.remove(*slot);
376 slot->state(Slot::connected);
377 counts_.add(*slot);
378 return true;
379 }
380
381 Result
382 activate(SlotImp::ptr const& slot, PublicKey const& key, bool reserved)
383 {
384 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
385 beast::Journal journal{sink};
386
387 JLOG(journal.debug())
388 << "Logic handshake " << slot->remote_endpoint() << " with "
389 << (reserved ? "reserved " : "") << "key " << key;
390
392
393 // The object must exist in our table
394 XRPL_ASSERT(
395 slots_.find(slot->remote_endpoint()) != slots_.end(),
396 "ripple::PeerFinder::Logic::activate : valid slot input");
397 // Must be accepted or connected
398 XRPL_ASSERT(
399 slot->state() == Slot::accept || slot->state() == Slot::connected,
400 "ripple::PeerFinder::Logic::activate : valid slot state");
401
402 // Check for duplicate connection by key
403 if (keys_.find(key) != keys_.end())
404 return Result::duplicatePeer;
405
406 // If the peer belongs to a cluster or is reserved,
407 // update the slot to reflect that.
408 counts_.remove(*slot);
409 slot->reserved(reserved);
410 counts_.add(*slot);
411
412 // See if we have an open space for this slot
413 if (!counts_.can_activate(*slot))
414 {
415 if (!slot->inbound())
416 bootcache_.on_success(slot->remote_endpoint());
417 if (slot->inbound() && counts_.in_max() == 0)
418 return Result::inboundDisabled;
419 return Result::full;
420 }
421
422 // Set the key right before adding to the map, otherwise we might
423 // assert later when erasing the key.
424 slot->public_key(key);
425 {
426 [[maybe_unused]] bool const inserted = keys_.insert(key).second;
427 // Public key must not already exist
428 XRPL_ASSERT(
429 inserted,
430 "ripple::PeerFinder::Logic::activate : public key inserted");
431 }
432
433 // Change state and update counts
434 counts_.remove(*slot);
435 slot->activate(m_clock.now());
436 counts_.add(*slot);
437
438 if (!slot->inbound())
439 bootcache_.on_success(slot->remote_endpoint());
440
441 // Mark fixed slot success
442 if (slot->fixed() && !slot->inbound())
443 {
444 auto iter(fixed_.find(slot->remote_endpoint()));
445 if (iter == fixed_.end())
447 "PeerFinder::Logic::activate(): remote_endpoint "
448 "missing from fixed_");
449
450 iter->second.success(m_clock.now());
451 JLOG(journal.trace()) << "Logic fixed success";
452 }
453
454 return Result::success;
455 }
456
463 {
465 RedirectHandouts h(slot);
467 handout(&h, (&h) + 1, livecache_.hops.begin(), livecache_.hops.end());
468 return std::move(h.list());
469 }
470
474 // VFALCO TODO This should add the returned addresses to the
475 // squelch list in one go once the list is built,
476 // rather than having each module add to the squelch list.
479 {
481
483
484 // Count how many more outbound attempts to make
485 //
486 auto needed(counts_.attempts_needed());
487 if (needed == 0)
488 return none;
489
490 ConnectHandouts h(needed, m_squelches);
491
492 // Make sure we don't connect to already-connected entries.
493 for (auto const& s : slots_)
494 {
495 auto const result(
496 m_squelches.insert(s.second->remote_endpoint().address()));
497 if (!result.second)
498 m_squelches.touch(result.first);
499 }
500
501 // 1. Use Fixed if:
502 // Fixed active count is below fixed count AND
503 // ( There are eligible fixed addresses to try OR
504 // Any outbound attempts are in progress)
505 //
506 if (counts_.fixed_active() < fixed_.size())
507 {
508 get_fixed(needed, h.list(), m_squelches);
509
510 if (!h.list().empty())
511 {
512 JLOG(m_journal.debug()) << beast::leftw(18) << "Logic connect "
513 << h.list().size() << " fixed";
514 return h.list();
515 }
516
517 if (counts_.attempts() > 0)
518 {
519 JLOG(m_journal.debug())
520 << beast::leftw(18) << "Logic waiting on "
521 << counts_.attempts() << " attempts";
522 return none;
523 }
524 }
525
526 // Only proceed if auto connect is enabled and we
527 // have less than the desired number of outbound slots
528 //
530 return none;
531
532 // 2. Use Livecache if:
533 // There are any entries in the cache OR
534 // Any outbound attempts are in progress
535 //
536 {
538 handout(
539 &h, (&h) + 1, livecache_.hops.rbegin(), livecache_.hops.rend());
540 if (!h.list().empty())
541 {
542 JLOG(m_journal.debug())
543 << beast::leftw(18) << "Logic connect " << h.list().size()
544 << " live "
545 << ((h.list().size() > 1) ? "endpoints" : "endpoint");
546 return h.list();
547 }
548 else if (counts_.attempts() > 0)
549 {
550 JLOG(m_journal.debug())
551 << beast::leftw(18) << "Logic waiting on "
552 << counts_.attempts() << " attempts";
553 return none;
554 }
555 }
556
557 /* 3. Bootcache refill
558 If the Bootcache is empty, try to get addresses from the current
559 set of Sources and add them into the Bootstrap cache.
560
561 Pseudocode:
562 If ( domainNames.count() > 0 AND (
563 unusedBootstrapIPs.count() == 0
564 OR activeNameResolutions.count() > 0) )
565 ForOneOrMore (DomainName that hasn't been resolved recently)
566 Contact DomainName and add entries to the
567 unusedBootstrapIPs return;
568 */
569
570 // 4. Use Bootcache if:
571 // There are any entries we haven't tried lately
572 //
573 for (auto iter(bootcache_.begin());
574 !h.full() && iter != bootcache_.end();
575 ++iter)
576 h.try_insert(*iter);
577
578 if (!h.list().empty())
579 {
580 JLOG(m_journal.debug())
581 << beast::leftw(18) << "Logic connect " << h.list().size()
582 << " boot "
583 << ((h.list().size() > 1) ? "addresses" : "address");
584 return h.list();
585 }
586
587 // If we get here we are stuck
588 return none;
589 }
590
593 {
595 result;
596
598
599 clock_type::time_point const now = m_clock.now();
600 if (m_whenBroadcast <= now)
601 {
603
604 {
605 // build list of active slots
607 slots.reserve(slots_.size());
609 slots_.cbegin(),
610 slots_.cend(),
611 [&slots](Slots::value_type const& value) {
612 if (value.second->state() == Slot::active)
613 slots.emplace_back(value.second);
614 });
615 std::shuffle(slots.begin(), slots.end(), default_prng());
616
617 // build target vector
618 targets.reserve(slots.size());
620 slots.cbegin(),
621 slots.cend(),
622 [&targets](SlotImp::ptr const& slot) {
623 targets.emplace_back(slot);
624 });
625 }
626
627 /* VFALCO NOTE
628 This is a temporary measure. Once we know our own IP
629 address, the correct solution is to put it into the Livecache
630 at hops 0, and go through the regular handout path. This way
631 we avoid handing our address out too frequenty, which this code
632 suffers from.
633 */
634 // Add an entry for ourselves if:
635 // 1. We want incoming
636 // 2. We have slots
637 // 3. We haven't failed the firewalled test
638 //
639 if (config_.wantIncoming && counts_.in_max() > 0)
640 {
641 Endpoint ep;
642 ep.hops = 0;
643 // we use the unspecified (0) address here because the value is
644 // irrelevant to recipients. When peers receive an endpoint
645 // with 0 hops, they use the socket remote_addr instead of the
646 // value in the message. Furthermore, since the address value
647 // is ignored, the type/version (ipv4 vs ipv6) doesn't matter
648 // either. ipv6 has a slightly more compact string
649 // representation of 0, so use that for self entries.
652 for (auto& t : targets)
653 t.insert(ep);
654 }
655
656 // build sequence of endpoints by hops
658 handout(
659 targets.begin(),
660 targets.end(),
663
664 // broadcast
665 for (auto const& t : targets)
666 {
667 SlotImp::ptr const& slot = t.slot();
668 auto const& list = t.list();
669 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
670 beast::Journal journal{sink};
671 JLOG(journal.trace())
672 << "Logic sending " << list.size()
673 << ((list.size() == 1) ? " endpoint" : " endpoints");
674 result.push_back(std::make_pair(slot, list));
675 }
676
678 }
679
680 return result;
681 }
682
683 void
685 {
687
688 // Expire the Livecache
690
691 // Expire the recent cache in each slot
692 for (auto const& entry : slots_)
693 entry.second->expire();
694
695 // Expire the recent attempts table
697
699 }
700
701 //--------------------------------------------------------------------------
702
703 // Validate and clean up the list that we received from the slot.
704 void
706 {
707 bool neighbor(false);
708 for (auto iter = list.begin(); iter != list.end();)
709 {
710 Endpoint& ep(*iter);
711
712 // Enforce hop limit
713 if (ep.hops > Tuning::maxHops)
714 {
715 JLOG(m_journal.debug())
716 << beast::leftw(18) << "Endpoints drop " << ep.address
717 << " for excess hops " << ep.hops;
718 iter = list.erase(iter);
719 continue;
720 }
721
722 // See if we are directly connected
723 if (ep.hops == 0)
724 {
725 if (!neighbor)
726 {
727 // Fill in our neighbors remote address
728 neighbor = true;
729 ep.address =
730 slot->remote_endpoint().at_port(ep.address.port());
731 }
732 else
733 {
734 JLOG(m_journal.debug())
735 << beast::leftw(18) << "Endpoints drop " << ep.address
736 << " for extra self";
737 iter = list.erase(iter);
738 continue;
739 }
740 }
741
742 // Discard invalid addresses
743 if (!is_valid_address(ep.address))
744 {
745 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop "
746 << ep.address << " as invalid";
747 iter = list.erase(iter);
748 continue;
749 }
750
751 // Filter duplicates
752 if (std::any_of(
753 list.begin(),
754 iter,
755 [ep](Endpoints::value_type const& other) {
756 return ep.address == other.address;
757 }))
758 {
759 JLOG(m_journal.debug()) << beast::leftw(18) << "Endpoints drop "
760 << ep.address << " as duplicate";
761 iter = list.erase(iter);
762 continue;
763 }
764
765 // Increment hop count on the incoming message, so
766 // we store it at the hop count we will send it at.
767 //
768 ++ep.hops;
769
770 ++iter;
771 }
772 }
773
774 void
776 {
777 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
778 beast::Journal journal{sink};
779
780 // If we're sent too many endpoints, sample them at random:
782 {
783 std::shuffle(list.begin(), list.end(), default_prng());
785 }
786
787 JLOG(journal.trace()) << "Endpoints contained " << list.size()
788 << ((list.size() > 1) ? " entries" : " entry");
789
791
792 // The object must exist in our table
793 XRPL_ASSERT(
794 slots_.find(slot->remote_endpoint()) != slots_.end(),
795 "ripple::PeerFinder::Logic::on_endpoints : valid slot input");
796
797 // Must be handshaked!
798 XRPL_ASSERT(
799 slot->state() == Slot::active,
800 "ripple::PeerFinder::Logic::on_endpoints : valid slot state");
801
803
804 // Limit how often we accept new endpoints
805 if (slot->whenAcceptEndpoints > now)
806 return;
807
808 preprocess(slot, list);
809
810 for (auto const& ep : list)
811 {
812 XRPL_ASSERT(
813 ep.hops,
814 "ripple::PeerFinder::Logic::on_endpoints : nonzero hops");
815
816 slot->recent.insert(ep.address, ep.hops);
817
818 // Note hops has been incremented, so 1
819 // means a directly connected neighbor.
820 //
821 if (ep.hops == 1)
822 {
823 if (slot->connectivityCheckInProgress)
824 {
825 JLOG(journal.debug()) << "Logic testing " << ep.address
826 << " already in progress";
827 continue;
828 }
829
830 if (!slot->checked)
831 {
832 // Mark that a check for this slot is now in progress.
833 slot->connectivityCheckInProgress = true;
834
835 // Test the slot's listening port before
836 // adding it to the livecache for the first time.
837 //
839 ep.address,
840 std::bind(
842 this,
843 slot->remote_endpoint(),
844 ep.address,
845 std::placeholders::_1));
846
847 // Note that we simply discard the first Endpoint
848 // that the neighbor sends when we perform the
849 // listening test. They will just send us another
850 // one in a few seconds.
851
852 continue;
853 }
854
855 // If they failed the test then skip the address
856 if (!slot->canAccept)
857 continue;
858 }
859
860 // We only add to the livecache if the neighbor passed the
861 // listening test, else we silently drop neighbor endpoint
862 // since their listening port is misconfigured.
863 //
864 livecache_.insert(ep);
865 bootcache_.insert(ep.address);
866 }
867
868 slot->whenAcceptEndpoints = now + Tuning::secondsPerMessage;
869 }
870
871 //--------------------------------------------------------------------------
872
873 void
874 remove(SlotImp::ptr const& slot)
875 {
876 {
877 auto const iter = slots_.find(slot->remote_endpoint());
878 // The slot must exist in the table
879 if (iter == slots_.end())
881 "PeerFinder::Logic::remove(): remote_endpoint "
882 "missing from slots_");
883
884 // Remove from slot by IP table
885 slots_.erase(iter);
886 }
887 // Remove the key if present
888 if (slot->public_key() != std::nullopt)
889 {
890 auto const iter = keys_.find(*slot->public_key());
891 // Key must exist
892 if (iter == keys_.end())
894 "PeerFinder::Logic::remove(): public_key missing "
895 "from keys_");
896
897 keys_.erase(iter);
898 }
899 // Remove from connected address table
900 {
901 auto const iter(
902 connectedAddresses_.find(slot->remote_endpoint().address()));
903 // Address must exist
904 if (iter == connectedAddresses_.end())
906 "PeerFinder::Logic::remove(): remote_endpont "
907 "address missing from connectedAddresses_");
908
910 }
911
912 // Update counts
913 counts_.remove(*slot);
914 }
915
916 void
918 {
920
921 remove(slot);
922
923 beast::WrappedSink sink{m_journal.sink(), slot->prefix()};
924 beast::Journal journal{sink};
925
926 // Mark fixed slot failure
927 if (slot->fixed() && !slot->inbound() && slot->state() != Slot::active)
928 {
929 auto iter(fixed_.find(slot->remote_endpoint()));
930 if (iter == fixed_.end())
932 "PeerFinder::Logic::on_closed(): remote_endpont "
933 "missing from fixed_");
934
935 iter->second.failure(m_clock.now());
936 JLOG(journal.debug()) << "Logic fixed failed";
937 }
938
939 // Do state specific bookkeeping
940 switch (slot->state())
941 {
942 case Slot::accept:
943 JLOG(journal.trace()) << "Logic accept failed";
944 break;
945
946 case Slot::connect:
947 case Slot::connected:
948 bootcache_.on_failure(slot->remote_endpoint());
949 // VFALCO TODO If the address exists in the ephemeral/live
950 // endpoint livecache then we should mark the
951 // failure
952 // as if it didn't pass the listening test. We should also
953 // avoid propagating the address.
954 break;
955
956 case Slot::active:
957 JLOG(journal.trace()) << "Logic close";
958 break;
959
960 case Slot::closing:
961 JLOG(journal.trace()) << "Logic finished";
962 break;
963
964 // LCOV_EXCL_START
965 default:
966 UNREACHABLE(
967 "ripple::PeerFinder::Logic::on_closed : invalid slot "
968 "state");
969 break;
970 // LCOV_EXCL_STOP
971 }
972 }
973
974 void
976 {
978
979 bootcache_.on_failure(slot->remote_endpoint());
980 }
981
982 // Insert a set of redirect IP addresses into the Bootcache
983 template <class FwdIter>
984 void
986 FwdIter first,
987 FwdIter last,
988 boost::asio::ip::tcp::endpoint const& remote_address);
989
990 //--------------------------------------------------------------------------
991
992 // Returns `true` if the address matches a fixed slot address
993 // Must have the lock held
994 bool
995 fixed(beast::IP::Endpoint const& endpoint) const
996 {
997 for (auto const& entry : fixed_)
998 if (entry.first == endpoint)
999 return true;
1000 return false;
1001 }
1002
1003 // Returns `true` if the address matches a fixed slot address
1004 // Note that this does not use the port information in the IP::Endpoint
1005 // Must have the lock held
1006 bool
1007 fixed(beast::IP::Address const& address) const
1008 {
1009 for (auto const& entry : fixed_)
1010 if (entry.first.address() == address)
1011 return true;
1012 return false;
1013 }
1014
1015 //--------------------------------------------------------------------------
1016 //
1017 // Connection Strategy
1018 //
1019 //--------------------------------------------------------------------------
1020
1022 template <class Container>
1023 void
1025 std::size_t needed,
1026 Container& c,
1027 typename ConnectHandouts::Squelches& squelches)
1028 {
1029 auto const now(m_clock.now());
1030 for (auto iter = fixed_.begin(); needed && iter != fixed_.end(); ++iter)
1031 {
1032 auto const& address(iter->first.address());
1033 if (iter->second.when() <= now &&
1034 squelches.find(address) == squelches.end() &&
1036 slots_.cbegin(),
1037 slots_.cend(),
1038 [address](Slots::value_type const& v) {
1039 return address == v.first.address();
1040 }))
1041 {
1042 squelches.insert(iter->first.address());
1043 c.push_back(iter->first);
1044 --needed;
1045 }
1046 }
1047 }
1048
1049 //--------------------------------------------------------------------------
1050
1051 void
1053 {
1054 fetch(source);
1055 }
1056
1057 void
1059 {
1060 m_sources.push_back(source);
1061 }
1062
1063 //--------------------------------------------------------------------------
1064 //
1065 // Bootcache livecache sources
1066 //
1067 //--------------------------------------------------------------------------
1068
1069 // Add a set of addresses.
1070 // Returns the number of addresses added.
1071 //
1072 int
1074 {
1075 int count(0);
1077 for (auto addr : list)
1078 {
1079 if (bootcache_.insertStatic(addr))
1080 ++count;
1081 }
1082 return count;
1083 }
1084
1085 // Fetch bootcache addresses from the specified source.
1086 void
1088 {
1089 Source::Results results;
1090
1091 {
1092 {
1094 if (stopping_)
1095 return;
1096 fetchSource_ = source;
1097 }
1098
1099 // VFALCO NOTE The fetch is synchronous,
1100 // not sure if that's a good thing.
1101 //
1102 source->fetch(results, m_journal);
1103
1104 {
1106 if (stopping_)
1107 return;
1108 fetchSource_ = nullptr;
1109 }
1110 }
1111
1112 if (!results.error)
1113 {
1114 int const count(addBootcacheAddresses(results.addresses));
1115 JLOG(m_journal.info())
1116 << beast::leftw(18) << "Logic added " << count << " new "
1117 << ((count == 1) ? "address" : "addresses") << " from "
1118 << source->name();
1119 }
1120 else
1121 {
1122 JLOG(m_journal.error()) << beast::leftw(18) << "Logic failed "
1123 << "'" << source->name() << "' fetch, "
1124 << results.error.message();
1125 }
1126 }
1127
1128 //--------------------------------------------------------------------------
1129 //
1130 // Endpoint message handling
1131 //
1132 //--------------------------------------------------------------------------
1133
1134 // Returns true if the IP::Endpoint contains no invalid data.
1135 bool
1137 {
1138 if (is_unspecified(address))
1139 return false;
1140 if (!is_public(address))
1141 return false;
1142 if (address.port() == 0)
1143 return false;
1144 return true;
1145 }
1146
1147 //--------------------------------------------------------------------------
1148 //
1149 // PropertyStream
1150 //
1151 //--------------------------------------------------------------------------
1152
1153 void
1155 {
1156 for (auto const& entry : slots)
1157 {
1159 SlotImp const& slot(*entry.second);
1160 if (slot.local_endpoint() != std::nullopt)
1161 item["local_address"] = to_string(*slot.local_endpoint());
1162 item["remote_address"] = to_string(slot.remote_endpoint());
1163 if (slot.inbound())
1164 item["inbound"] = "yes";
1165 if (slot.fixed())
1166 item["fixed"] = "yes";
1167 if (slot.reserved())
1168 item["reserved"] = "yes";
1169
1170 item["state"] = stateString(slot.state());
1171 }
1172 }
1173
1174 void
1176 {
1178
1179 // VFALCO NOTE These ugly casts are needed because
1180 // of how std::size_t is declared on some linuxes
1181 //
1182 map["bootcache"] = std::uint32_t(bootcache_.size());
1183 map["fixed"] = std::uint32_t(fixed_.size());
1184
1185 {
1186 beast::PropertyStream::Set child("peers", map);
1187 writeSlots(child, slots_);
1188 }
1189
1190 {
1191 beast::PropertyStream::Map child("counts", map);
1192 counts_.onWrite(child);
1193 }
1194
1195 {
1196 beast::PropertyStream::Map child("config", map);
1197 config_.onWrite(child);
1198 }
1199
1200 {
1201 beast::PropertyStream::Map child("livecache", map);
1202 livecache_.onWrite(child);
1203 }
1204
1205 {
1206 beast::PropertyStream::Map child("bootcache", map);
1207 bootcache_.onWrite(child);
1208 }
1209 }
1210
1211 //--------------------------------------------------------------------------
1212 //
1213 // Diagnostics
1214 //
1215 //--------------------------------------------------------------------------
1216
1217 Counts const&
1218 counts() const
1219 {
1220 return counts_;
1221 }
1222
1223 static std::string
1225 {
1226 switch (state)
1227 {
1228 case Slot::accept:
1229 return "accept";
1230 case Slot::connect:
1231 return "connect";
1232 case Slot::connected:
1233 return "connected";
1234 case Slot::active:
1235 return "active";
1236 case Slot::closing:
1237 return "closing";
1238 default:
1239 break;
1240 };
1241 return "?";
1242 }
1243};
1244
1245//------------------------------------------------------------------------------
1246
1247template <class Checker>
1248template <class FwdIter>
1249void
1251 FwdIter first,
1252 FwdIter last,
1253 boost::asio::ip::tcp::endpoint const& remote_address)
1254{
1255 std::lock_guard _(lock_);
1256 std::size_t n = 0;
1257 for (; first != last && n < Tuning::maxRedirects; ++first, ++n)
1258 bootcache_.insert(beast::IPAddressConversion::from_asio(*first));
1259 if (n > 0)
1260 {
1261 JLOG(m_journal.trace()) << beast::leftw(18) << "Logic add " << n
1262 << " redirect IPs from " << remote_address;
1263 }
1264}
1265
1266} // namespace PeerFinder
1267} // namespace ripple
1268
1269#endif
T any_of(T... args)
T cbegin(T... args)
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
Endpoint at_port(Port port) const
Returns a new Endpoint with a different port.
Definition IPEndpoint.h:49
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:42
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Sink & sink() const
Returns the Sink associated with this Journal.
Definition Journal.h:278
Stream info() const
Definition Journal.h:315
Wraps a Journal::Sink to prefix its output with a string.
Definition WrappedSink.h:15
typename Clock::time_point time_point
virtual time_point now() const =0
Returns the current time.
Associative container where each element is also indexed by time.
auto insert(value_type const &value) -> typename std::enable_if<!maybe_multi, std::pair< iterator, bool > >::type
void touch(beast::detail::aged_container_iterator< is_const, Iterator > pos)
Stores IP addresses useful for gaining initial connections.
Definition Bootcache.h:35
map_type::size_type size() const
Returns the number of entries in the cache.
Definition Bootcache.cpp:31
const_iterator end() const
Definition Bootcache.cpp:49
void periodicActivity()
Stores the cache in the persistent database on a timer.
bool insert(beast::IP::Endpoint const &endpoint)
Add a newly-learned address to the cache.
Definition Bootcache.cpp:93
void on_failure(beast::IP::Endpoint const &endpoint)
Called when an outbound connection attempt fails to handshake.
const_iterator begin() const
IP::Endpoint iterators that traverse in decreasing valence.
Definition Bootcache.cpp:37
void onWrite(beast::PropertyStream::Map &map)
Write the cache state to the property stream.
void on_success(beast::IP::Endpoint const &endpoint)
Called when an outbound connection handshake completes.
bool insertStatic(beast::IP::Endpoint const &endpoint)
Add a staticallyconfigured address to the cache.
void load()
Load the persisted data from the Store into the container.
Definition Bootcache.cpp:70
Tests remote listening sockets to make sure they are connectible.
Definition Checker.h:20
void async_connect(beast::IP::Endpoint const &endpoint, Handler &&handler)
Performs an async connection test on the specified endpoint.
Definition Checker.h:187
Receives handouts for making automatic connections.
Definition Handouts.h:253
bool try_insert(beast::IP::Endpoint const &endpoint)
Definition Handouts.h:314
Manages the count of available connections for the various slots.
Definition Counts.h:15
std::size_t fixed_active() const
Returns the number of active fixed connections.
Definition Counts.h:108
int out_active() const
Returns the number of outbound peers assigned an open slot.
Definition Counts.h:94
std::size_t attempts_needed() const
Returns the number of attempts needed to bring us to the max.
Definition Counts.h:69
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Counts.h:209
int out_max() const
Returns the total number of outbound slots.
Definition Counts.h:85
void add(Slot const &s)
Adds the slot state and properties to the slot counts.
Definition Counts.h:37
int in_max() const
Returns the total number of inbound slots.
Definition Counts.h:147
void remove(Slot const &s)
Removes the slot state and properties from the slot counts.
Definition Counts.h:44
void onConfig(Config const &config)
Called when the config is set or changed.
Definition Counts.h:117
bool can_activate(Slot const &s) const
Returns true if the slot can become active.
Definition Counts.h:51
std::size_t attempts() const
Returns the number of outbound connection attempts.
Definition Counts.h:78
void shuffle()
Shuffle each hop list.
Definition Livecache.h:480
The Livecache holds the short-lived relayed Endpoint messages.
Definition Livecache.h:179
void expire()
Erase entries whose time has expired.
Definition Livecache.h:388
void insert(Endpoint const &ep)
Creates or updates an existing Element based on a new message.
Definition Livecache.h:410
void onWrite(beast::PropertyStream::Map &map)
Output statistics.
Definition Livecache.h:457
class ripple::PeerFinder::Livecache::hops_t hops
The Logic for maintaining the list of Slot addresses.
bool fixed(beast::IP::Endpoint const &endpoint) const
int addBootcacheAddresses(IPAddresses const &list)
void onWrite(beast::PropertyStream::Map &map)
void fetch(std::shared_ptr< Source > const &source)
bool onConnected(SlotImp::ptr const &slot, beast::IP::Endpoint const &local_endpoint)
bool is_valid_address(beast::IP::Endpoint const &address)
std::vector< std::pair< std::shared_ptr< Slot >, std::vector< Endpoint > > > buildEndpointsForPeers()
void on_closed(SlotImp::ptr const &slot)
void addFixedPeer(std::string const &name, std::vector< beast::IP::Endpoint > const &addresses)
void on_failure(SlotImp::ptr const &slot)
void preprocess(SlotImp::ptr const &slot, Endpoints &list)
void checkComplete(beast::IP::Endpoint const &remoteAddress, beast::IP::Endpoint const &checkedAddress, boost::system::error_code ec)
Result activate(SlotImp::ptr const &slot, PublicKey const &key, bool reserved)
bool fixed(beast::IP::Address const &address) const
void remove(SlotImp::ptr const &slot)
std::pair< SlotImp::ptr, Result > new_inbound_slot(beast::IP::Endpoint const &local_endpoint, beast::IP::Endpoint const &remote_endpoint)
std::multiset< beast::IP::Address > connectedAddresses_
void get_fixed(std::size_t needed, Container &c, typename ConnectHandouts::Squelches &squelches)
Adds eligible Fixed addresses for outbound attempts.
std::vector< beast::IP::Endpoint > autoconnect()
Create new outbound connection attempts as needed.
std::vector< std::shared_ptr< Source > > m_sources
void onRedirects(FwdIter first, FwdIter last, boost::asio::ip::tcp::endpoint const &remote_address)
void addSource(std::shared_ptr< Source > const &source)
void on_endpoints(SlotImp::ptr const &slot, Endpoints list)
void addStaticSource(std::shared_ptr< Source > const &source)
Logic(clock_type &clock, Store &store, Checker &checker, beast::Journal journal)
void writeSlots(beast::PropertyStream::Set &set, Slots const &slots)
std::pair< SlotImp::ptr, Result > new_outbound_slot(beast::IP::Endpoint const &remote_endpoint)
void addFixedPeer(std::string const &name, beast::IP::Endpoint const &ep)
std::vector< Endpoint > redirect(SlotImp::ptr const &slot)
Return a list of addresses suitable for redirection.
std::map< beast::IP::Endpoint, Fixed > fixed_
static std::string stateString(Slot::State state)
Receives handouts for redirecting a connection.
Definition Handouts.h:85
std::vector< Endpoint > & list()
Definition Handouts.h:107
std::optional< beast::IP::Endpoint > const & local_endpoint() const override
The local endpoint of the socket, when known.
Definition SlotImp.h:64
bool fixed() const override
Returns true if this is a fixed connection.
Definition SlotImp.h:40
bool inbound() const override
Returns true if this is an inbound connection.
Definition SlotImp.h:34
std::shared_ptr< SlotImp > ptr
Definition SlotImp.h:18
void set_listening_port(std::uint16_t port)
Definition SlotImp.h:91
State state() const override
Returns the state of the connection.
Definition SlotImp.h:52
std::string prefix() const
Definition SlotImp.h:76
beast::IP::Endpoint const & remote_endpoint() const override
The remote endpoint of socket.
Definition SlotImp.h:58
bool reserved() const override
Returns true if this is a reserved connection.
Definition SlotImp.h:46
Abstract persistence for PeerFinder data.
Definition Store.h:9
A public key.
Definition PublicKey.h:43
T count(T... args)
T emplace(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T for_each(T... args)
T forward_as_tuple(T... args)
T is_same_v
T make_pair(T... args)
T make_tuple(T... args)
boost::asio::ip::address_v6 AddressV6
Definition IPAddressV6.h:11
boost::asio::ip::address Address
Definition IPAddress.h:20
std::enable_if< is_aged_container< AgedContainer >::value, std::size_t >::type expire(AgedContainer &c, std::chrono::duration< Rep, Period > const &age)
Expire aged container items past the specified age.
std::chrono::seconds constexpr secondsPerMessage(151)
std::chrono::seconds constexpr recentAttemptDuration(60)
std::string_view to_string(Result result) noexcept
Converts a Result enum value to its string representation.
Result
Possible results from activating a slot.
void handout(TargetFwdIter first, TargetFwdIter last, SeqFwdIter seq_first, SeqFwdIter seq_last)
Distributes objects to targets according to business rules.
Definition Handouts.h:49
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
beast::xor_shift_engine & default_prng()
Return the default random engine.
T piecewise_construct
T push_back(T... args)
T shuffle(T... args)
T ref(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Left justifies a field at the specified width.
Definition iosformat.h:15
PeerFinder configuration settings.
void onWrite(beast::PropertyStream::Map &map)
Write the configuration into a property stream.
bool autoConnect
true if we want to establish connections automatically
int ipLimit
Limit how many incoming connections we allow per IP.
bool wantIncoming
true if we want to accept incoming connections.
std::uint16_t listeningPort
The listening port number.
Describes a connectible peer address along with some metadata.
The results of a fetch.
Definition Source.h:24
boost::system::error_code error
Definition Source.h:28