rippled
Loading...
Searching...
No Matches
ValidatorSite.cpp
1#include <xrpld/app/misc/ValidatorList.h>
2#include <xrpld/app/misc/ValidatorSite.h>
3#include <xrpld/app/misc/detail/WorkFile.h>
4#include <xrpld/app/misc/detail/WorkPlain.h>
5#include <xrpld/app/misc/detail/WorkSSL.h>
6
7#include <xrpl/json/json_reader.h>
8#include <xrpl/protocol/digest.h>
9#include <xrpl/protocol/jss.h>
10
11#include <algorithm>
12
13namespace xrpl {
14
17unsigned short constexpr max_redirects = 3;
18
20{
21 if (!parseUrl(pUrl, uri))
22 throw std::runtime_error("URI '" + uri + "' cannot be parsed");
23
24 if (pUrl.scheme == "file")
25 {
26 if (!pUrl.domain.empty())
27 throw std::runtime_error("file URI cannot contain a hostname");
28
29#if BOOST_OS_WINDOWS
30 // Paths on Windows need the leading / removed
31 if (pUrl.path[0] == '/')
33#endif
34
35 if (pUrl.path.empty())
36 throw std::runtime_error("file URI must contain a path");
37 }
38 else if (pUrl.scheme == "http")
39 {
40 if (pUrl.domain.empty())
41 throw std::runtime_error("http URI must contain a hostname");
42
43 if (!pUrl.port)
44 pUrl.port = 80;
45 }
46 else if (pUrl.scheme == "https")
47 {
48 if (pUrl.domain.empty())
49 throw std::runtime_error("https URI must contain a hostname");
50
51 if (!pUrl.port)
52 pUrl.port = 443;
53 }
54 else
55 {
56 throw std::runtime_error("Unsupported scheme: '" + pUrl.scheme + "'");
57 }
58}
59
68
70 Application& app,
73 : app_{app}
74 , j_{j ? *j : app_.getJournal("ValidatorSite")}
75 , timer_{app_.getIOContext()}
76 , fetching_{false}
77 , pending_{false}
78 , stopping_{false}
79 , requestTimeout_{timeout}
80{
81}
82
84{
86 if (timer_.expiry() > clock_type::time_point{})
87 {
88 if (!stopping_)
89 {
90 lock.unlock();
91 stop();
92 }
93 else
94 {
95 cv_.wait(lock, [&] { return !fetching_; });
96 }
97 }
98}
99
100bool
102{
103 auto const sites = app_.getValidators().loadLists();
104 return sites.empty() || load(sites, lock_sites);
105}
106
107bool
109{
110 JLOG(j_.debug()) << "Loading configured validator list sites";
111
112 std::lock_guard const lock{sites_mutex_};
113
114 return load(siteURIs, lock);
115}
116
117bool
119 std::vector<std::string> const& siteURIs,
120 std::lock_guard<std::mutex> const& lock_sites)
121{
122 // If no sites are provided, act as if a site failed to load.
123 if (siteURIs.empty())
124 {
125 return missingSite(lock_sites);
126 }
127
128 for (auto const& uri : siteURIs)
129 {
130 try
131 {
132 sites_.emplace_back(uri);
133 }
134 catch (std::exception const& e)
135 {
136 JLOG(j_.error()) << "Invalid validator site uri: " << uri << ": " << e.what();
137 return false;
138 }
139 }
140
141 JLOG(j_.debug()) << "Loaded " << siteURIs.size() << " sites";
142
143 return true;
144}
145
146void
148{
151 if (timer_.expiry() == clock_type::time_point{})
152 setTimer(l0, l1);
153}
154
155void
157{
159 cv_.wait(lock, [&] { return !pending_; });
160}
161
162void
164{
166 stopping_ = true;
167 // work::cancel() must be called before the
168 // cv wait in order to kick any asio async operations
169 // that might be pending.
170 if (auto sp = work_.lock())
171 sp->cancel();
172 cv_.wait(lock, [&] { return !fetching_; });
173
174 // docs indicate cancel() can throw, but this should be
175 // reconsidered if it changes to noexcept
176 try
177 {
178 timer_.cancel();
179 }
180 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
181 {
182 }
183 stopping_ = false;
184 pending_ = false;
185 cv_.notify_all();
186}
187
188void
190 std::lock_guard<std::mutex> const& site_lock,
191 std::lock_guard<std::mutex> const& state_lock)
192{
193 auto next = std::min_element(sites_.begin(), sites_.end(), [](Site const& a, Site const& b) {
194 return a.nextRefresh < b.nextRefresh;
195 });
196
197 if (next != sites_.end())
198 {
199 pending_ = next->nextRefresh <= clock_type::now();
200 cv_.notify_all();
201 timer_.expires_at(next->nextRefresh);
202 auto idx = std::distance(sites_.begin(), next);
203 timer_.async_wait(
204 [this, idx](boost::system::error_code const& ec) { this->onTimer(idx, ec); });
205 }
206}
207
208void
211 std::size_t siteIdx,
212 std::lock_guard<std::mutex> const& sites_lock)
213{
214 fetching_ = true;
215 sites_[siteIdx].activeResource = resource;
217 auto timeoutCancel = [this]() {
218 std::lock_guard const lock_state{state_mutex_};
219 // docs indicate cancel_one() can throw, but this
220 // should be reconsidered if it changes to noexcept
221 try
222 {
223 timer_.cancel_one();
224 }
225 catch (boost::system::system_error const&) // NOLINT(bugprone-empty-catch)
226 {
227 }
228 };
229 auto onFetch = [this, siteIdx, timeoutCancel](
230 error_code const& err,
231 endpoint_type const& endpoint,
232 detail::response_type const& resp) {
233 timeoutCancel();
234 onSiteFetch(err, endpoint, resp, siteIdx);
235 };
236
237 auto onFetchFile = [this, siteIdx, timeoutCancel](
238 error_code const& err, std::string const& resp) {
239 timeoutCancel();
240 onTextFetch(err, resp, siteIdx);
241 };
242
243 JLOG(j_.debug()) << "Starting request for " << resource->uri;
244
245 if (resource->pUrl.scheme == "https")
246 {
247 // can throw...
249 resource->pUrl.domain,
250 resource->pUrl.path,
251 std::to_string(*resource->pUrl.port),
253 j_,
254 app_.config(),
255 sites_[siteIdx].lastRequestEndpoint,
256 sites_[siteIdx].lastRequestSuccessful,
257 onFetch);
258 }
259 else if (resource->pUrl.scheme == "http")
260 {
262 resource->pUrl.domain,
263 resource->pUrl.path,
264 std::to_string(*resource->pUrl.port),
266 sites_[siteIdx].lastRequestEndpoint,
267 sites_[siteIdx].lastRequestSuccessful,
268 onFetch);
269 }
270 else
271 {
272 BOOST_ASSERT(resource->pUrl.scheme == "file");
274 resource->pUrl.path, app_.getIOContext(), onFetchFile);
275 }
276
277 sites_[siteIdx].lastRequestSuccessful = false;
278 work_ = sp;
279 sp->run();
280 // start a timer for the request, which shouldn't take more
281 // than requestTimeout_ to complete
282 std::lock_guard const lock_state{state_mutex_};
283 timer_.expires_after(requestTimeout_);
284 timer_.async_wait([this, siteIdx](boost::system::error_code const& ec) {
285 this->onRequestTimeout(siteIdx, ec);
286 });
287}
288
289void
291{
292 if (ec)
293 return;
294
295 {
296 std::lock_guard const lock_site{sites_mutex_};
297 // In some circumstances, both this function and the response
298 // handler (onSiteFetch or onTextFetch) can get queued and
299 // processed. In all observed cases, the response handler
300 // processes a network error. Usually, this function runs first,
301 // but on extremely rare occasions, the response handler can run
302 // first, which will leave activeResource empty.
303 auto const& site = sites_[siteIdx];
304 if (site.activeResource)
305 {
306 JLOG(j_.warn()) << "Request for " << site.activeResource->uri << " took too long";
307 }
308 else
309 JLOG(j_.error()) << "Request took too long, but a response has "
310 "already been processed";
311 }
312
313 std::lock_guard const lock_state{state_mutex_};
314 if (auto sp = work_.lock())
315 sp->cancel();
316}
317
318void
320{
321 if (ec)
322 {
323 // Restart the timer if any errors are encountered, unless the error
324 // is from the wait operation being aborted due to a shutdown request.
325 if (ec != boost::asio::error::operation_aborted)
326 onSiteFetch(ec, {}, detail::response_type{}, siteIdx);
327 return;
328 }
329
330 try
331 {
332 std::lock_guard const lock{sites_mutex_};
333 sites_[siteIdx].nextRefresh = clock_type::now() + sites_[siteIdx].refreshInterval;
334 sites_[siteIdx].redirCount = 0;
335 // the WorkSSL client ctor can throw if SSL init fails
336 makeRequest(sites_[siteIdx].startingResource, siteIdx, lock);
337 }
338 catch (std::exception const& ex)
339 {
340 JLOG(j_.error()) << "Exception in " << __func__ << ": " << ex.what();
342 boost::system::error_code{-1, boost::system::generic_category()},
343 {},
345 siteIdx);
346 }
347}
348
349void
351 std::string const& res,
352 std::size_t siteIdx,
353 std::lock_guard<std::mutex> const& sites_lock)
354{
355 Json::Value const body = [&res, siteIdx, this]() {
356 Json::Reader r;
357 Json::Value body;
358 if (!r.parse(res.data(), body))
359 {
360 JLOG(j_.warn()) << "Unable to parse JSON response from "
361 << sites_[siteIdx].activeResource->uri;
362 throw std::runtime_error{"bad json"};
363 }
364 return body;
365 }();
366
367 auto const [valid, version, blobs] = [&body]() {
368 // Check the easy fields first
369 bool valid = body.isObject() && body.isMember(jss::manifest) &&
370 body[jss::manifest].isString() && body.isMember(jss::version) &&
371 body[jss::version].isInt();
372 // Check the version-specific blob & signature fields
373 std::uint32_t version = 0;
375 if (valid)
376 {
377 version = body[jss::version].asUInt();
378 blobs = ValidatorList::parseBlobs(version, body);
379 valid = !blobs.empty();
380 }
381 return std::make_tuple(valid, version, blobs);
382 }();
383
384 if (!valid)
385 {
386 JLOG(j_.warn()) << "Missing fields in JSON response from "
387 << sites_[siteIdx].activeResource->uri;
388 throw std::runtime_error{"missing fields"};
389 }
390
391 auto const manifest = body[jss::manifest].asString();
392 XRPL_ASSERT(
393 version == body[jss::version].asUInt(),
394 "xrpl::ValidatorSite::parseJsonResponse : version match");
395 auto const& uri = sites_[siteIdx].activeResource->uri;
396 auto const hash = sha512Half(manifest, blobs, version);
397 auto const applyResult = app_.getValidators().applyListsAndBroadcast(
398 manifest,
399 version,
400 blobs,
401 uri,
402 hash,
405 app_.getOPs());
406
407 sites_[siteIdx].lastRefreshStatus.emplace(
408 Site::Status{clock_type::now(), applyResult.bestDisposition(), ""});
409
410 for (auto const& [disp, count] : applyResult.dispositions)
411 {
412 switch (disp)
413 {
415 JLOG(j_.debug()) << "Applied " << count << " new validator list(s) from " << uri;
416 break;
418 JLOG(j_.debug()) << "Applied " << count << " expired validator list(s) from "
419 << uri;
420 break;
422 JLOG(j_.debug()) << "Ignored " << count
423 << " validator list(s) with current sequence from " << uri;
424 break;
426 JLOG(j_.debug()) << "Processed " << count << " future validator list(s) from "
427 << uri;
428 break;
430 JLOG(j_.debug()) << "Ignored " << count
431 << " validator list(s) with future known sequence from " << uri;
432 break;
434 JLOG(j_.warn()) << "Ignored " << count << "stale validator list(s) from " << uri;
435 break;
437 JLOG(j_.warn()) << "Ignored " << count << " untrusted validator list(s) from "
438 << uri;
439 break;
441 JLOG(j_.warn()) << "Ignored " << count << " invalid validator list(s) from " << uri;
442 break;
444 JLOG(j_.warn()) << "Ignored " << count
445 << " unsupported version validator list(s) from " << uri;
446 break;
447 default:
448 BOOST_ASSERT(false);
449 }
450 }
451
452 if (body.isMember(jss::refresh_interval) && body[jss::refresh_interval].isNumeric())
453 {
454 using namespace std::chrono_literals;
455 std::chrono::minutes const refresh = std::clamp(
456 std::chrono::minutes{body[jss::refresh_interval].asUInt()},
457 1min,
459 sites_[siteIdx].refreshInterval = refresh;
460 sites_[siteIdx].nextRefresh = clock_type::now() + sites_[siteIdx].refreshInterval;
461 }
462}
463
466 detail::response_type const& res,
467 std::size_t siteIdx,
468 std::lock_guard<std::mutex> const& sites_lock)
469{
470 using namespace boost::beast::http;
472 if (!res.contains(field::location) || res[field::location].empty())
473 {
474 JLOG(j_.warn()) << "Request for validator list at " << sites_[siteIdx].activeResource->uri
475 << " returned a redirect with no Location.";
476 throw std::runtime_error{"missing location"};
477 }
478
479 if (sites_[siteIdx].redirCount == max_redirects)
480 {
481 JLOG(j_.warn()) << "Exceeded max redirects for validator list at "
482 << sites_[siteIdx].loadedResource->uri;
483 throw std::runtime_error{"max redirects"};
484 }
485
486 JLOG(j_.debug()) << "Got redirect for validator list from "
487 << sites_[siteIdx].activeResource->uri << " to new location "
488 << res[field::location];
489
490 try
491 {
492 newLocation = std::make_shared<Site::Resource>(std::string(res[field::location]));
493 ++sites_[siteIdx].redirCount;
494 if (newLocation->pUrl.scheme != "http" && newLocation->pUrl.scheme != "https")
495 throw std::runtime_error("invalid scheme in redirect " + newLocation->pUrl.scheme);
496 }
497 catch (std::exception const& ex)
498 {
499 JLOG(j_.error()) << "Invalid redirect location: " << res[field::location];
500 throw;
501 }
502 return newLocation;
503}
504
505void
507 boost::system::error_code const& ec,
508 endpoint_type const& endpoint,
509 detail::response_type const& res,
510 std::size_t siteIdx)
511{
512 std::lock_guard lock_sites{sites_mutex_};
513 {
514 if (endpoint != endpoint_type{})
515 sites_[siteIdx].lastRequestEndpoint = endpoint;
516 JLOG(j_.debug()) << "Got completion for " << sites_[siteIdx].activeResource->uri << " "
517 << endpoint;
518 auto onError = [&](std::string const& errMsg, bool retry) {
519 sites_[siteIdx].lastRefreshStatus.emplace(
521 if (retry)
522 sites_[siteIdx].nextRefresh = clock_type::now() + error_retry_interval;
523
524 // See if there's a copy saved locally from last time we
525 // saw the list.
526 missingSite(lock_sites);
527 };
528 if (ec)
529 {
530 JLOG(j_.warn()) << "Problem retrieving from " << sites_[siteIdx].activeResource->uri
531 << " " << endpoint << " " << ec.value() << ":" << ec.message();
532 onError("fetch error", true);
533 }
534 else
535 {
536 try
537 {
538 using namespace boost::beast::http;
539 switch (res.result())
540 {
541 case status::ok:
542 sites_[siteIdx].lastRequestSuccessful = true;
543 parseJsonResponse(res.body(), siteIdx, lock_sites);
544 break;
545 case status::moved_permanently:
546 case status::permanent_redirect:
547 case status::found:
548 case status::temporary_redirect: {
549 auto newLocation = processRedirect(res, siteIdx, lock_sites);
550 XRPL_ASSERT(
551 newLocation,
552 "xrpl::ValidatorSite::onSiteFetch : non-null "
553 "validator");
554 // for perm redirects, also update our starting URI
555 if (res.result() == status::moved_permanently ||
556 res.result() == status::permanent_redirect)
557 {
558 sites_[siteIdx].startingResource = newLocation;
559 }
560 makeRequest(newLocation, siteIdx, lock_sites);
561 return; // we are still fetching, so skip
562 // state update/notify below
563 }
564 default: {
565 JLOG(j_.warn()) << "Request for validator list at "
566 << sites_[siteIdx].activeResource->uri << " " << endpoint
567 << " returned bad status: " << res.result_int();
568 onError("bad result code", true);
569 }
570 }
571 }
572 catch (std::exception const& ex)
573 {
574 JLOG(j_.error()) << "Exception in " << __func__ << ": " << ex.what();
575 onError(ex.what(), false);
576 }
577 }
578 sites_[siteIdx].activeResource.reset();
579 }
580
581 std::lock_guard const lock_state{state_mutex_};
582 fetching_ = false;
583 if (!stopping_)
584 setTimer(lock_sites, lock_state);
585 cv_.notify_all();
586}
587
588void
590 boost::system::error_code const& ec,
591 std::string const& res,
592 std::size_t siteIdx)
593{
594 std::lock_guard const lock_sites{sites_mutex_};
595 {
596 try
597 {
598 if (ec)
599 {
600 JLOG(j_.warn()) << "Problem retrieving from " << sites_[siteIdx].activeResource->uri
601 << " " << ec.value() << ": " << ec.message();
602 throw std::runtime_error{"fetch error"};
603 }
604
605 sites_[siteIdx].lastRequestSuccessful = true;
606
607 parseJsonResponse(res, siteIdx, lock_sites);
608 }
609 catch (std::exception const& ex)
610 {
611 JLOG(j_.error()) << "Exception in " << __func__ << ": " << ex.what();
612 sites_[siteIdx].lastRefreshStatus.emplace(
614 }
615 sites_[siteIdx].activeResource.reset();
616 }
617
618 std::lock_guard const lock_state{state_mutex_};
619 fetching_ = false;
620 if (!stopping_)
621 setTimer(lock_sites, lock_state);
622 cv_.notify_all();
623}
624
627{
628 using namespace std::chrono;
629 using Int = Json::Value::Int;
630
632 Json::Value& jSites = (jrr[jss::validator_sites] = Json::arrayValue);
633 {
634 std::lock_guard const lock{sites_mutex_};
635 for (Site const& site : sites_)
636 {
639 uri << site.loadedResource->uri;
640 if (site.loadedResource != site.startingResource)
641 uri << " (redirects to " << site.startingResource->uri + ")";
642 v[jss::uri] = uri.str();
643 v[jss::next_refresh_time] = to_string(site.nextRefresh);
644 if (site.lastRefreshStatus)
645 {
646 v[jss::last_refresh_time] = to_string(site.lastRefreshStatus->refreshed);
647 v[jss::last_refresh_status] = to_string(site.lastRefreshStatus->disposition);
648 if (!site.lastRefreshStatus->message.empty())
649 v[jss::last_refresh_message] = site.lastRefreshStatus->message;
650 }
651 v[jss::refresh_interval_min] = static_cast<Int>(site.refreshInterval.count());
652 }
653 }
654 return jrr;
655}
656} // namespace xrpl
T clamp(T... args)
Unserialize a JSON document into a Value.
Definition json_reader.h:17
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:130
Value & append(Value const &value)
Append value to array at the end.
bool isString() const
UInt asUInt() const
bool isObject() const
Json::Int Int
Definition json_value.h:138
std::string asString() const
Returns the unquoted string value.
bool isMember(char const *key) const
Return true if the object has a member named key.
bool isNumeric() const
bool isInt() const
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
virtual NetworkOPs & getOPs()=0
virtual ValidatorList & getValidators()=0
virtual HashRouter & getHashRouter()=0
virtual boost::asio::io_context & getIOContext()=0
virtual Overlay & getOverlay()=0
std::vector< std::string > loadLists()
static std::vector< ValidatorBlobInfo > parseBlobs(std::uint32_t version, Json::Value const &body)
Pull the blob/signature/manifest information out of the appropriate Json body fields depending on the...
PublisherListStats applyListsAndBroadcast(std::string const &manifest, std::uint32_t version, std::vector< ValidatorBlobInfo > const &blobs, std::string siteUri, uint256 const &hash, Overlay &overlay, HashRouter &hashRouter, NetworkOPs &networkOPs)
Apply multiple published lists of public keys, then broadcast it to all peers that have not seen it o...
void onRequestTimeout(std::size_t siteIdx, error_code const &ec)
request took too long
void setTimer(std::lock_guard< std::mutex > const &, std::lock_guard< std::mutex > const &)
Queue next site to be fetched lock over site_mutex_ and state_mutex_ required.
bool load(std::vector< std::string > const &siteURIs)
Load configured site URIs.
void join()
Wait for current fetches from sites to complete.
std::shared_ptr< Site::Resource > processRedirect(detail::response_type const &res, std::size_t siteIdx, std::lock_guard< std::mutex > const &)
Interpret a redirect response.
bool missingSite(std::lock_guard< std::mutex > const &)
If no sites are provided, or a site fails to load, get a list of local cache files from the Validator...
std::atomic< bool > pending_
std::atomic< bool > fetching_
void start()
Start fetching lists from sites.
std::chrono::seconds const requestTimeout_
boost::asio::basic_waitable_timer< clock_type > timer_
boost::asio::ip::tcp::endpoint endpoint_type
void makeRequest(std::shared_ptr< Site::Resource > resource, std::size_t siteIdx, std::lock_guard< std::mutex > const &)
Initiate request to given resource.
Json::Value getJson() const
Return JSON representation of configured validator sites.
std::condition_variable cv_
void parseJsonResponse(std::string const &res, std::size_t siteIdx, std::lock_guard< std::mutex > const &)
Parse json response from validator list site.
beast::Journal const j_
std::atomic< bool > stopping_
ValidatorSite(Application &app, std::optional< beast::Journal > j=std::nullopt, std::chrono::seconds timeout=std::chrono::seconds{20})
boost::system::error_code error_code
std::weak_ptr< detail::Work > work_
Application & app_
void onSiteFetch(boost::system::error_code const &ec, endpoint_type const &endpoint, detail::response_type const &res, std::size_t siteIdx)
Store latest list fetched from site.
void onTextFetch(boost::system::error_code const &ec, std::string const &res, std::size_t siteIdx)
Store latest list fetched from anywhere.
void onTimer(std::size_t siteIdx, error_code const &ec)
Fetch site whose time has come.
void stop()
Stop fetching lists from sites.
std::vector< Site > sites_
T data(T... args)
T distance(T... args)
T empty(T... args)
T is_same_v
T make_tuple(T... args)
T min_element(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:25
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:26
STL namespace.
TER valid(STTx const &tx, ReadView const &view, AccountID const &src, beast::Journal j)
boost::beast::http::response< boost::beast::http::string_body > response_type
Definition Work.h:10
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
Definition digest.h:204
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:602
@ unsupported_version
List version is not supported.
@ stale
Trusted publisher key, but seq is too old.
@ accepted
List is valid.
@ untrusted
List signed by untrusted publisher key.
@ same_sequence
Same sequence as current list.
@ pending
List will be valid in the future.
@ known_sequence
Future sequence already seen.
@ expired
List is expired, but has the largest non-pending sequence seen so far.
@ invalid
Invalid format or signature.
bool parseUrl(parsedURL &pUrl, std::string const &strUrl)
unsigned short constexpr max_redirects
@ manifest
Manifest.
auto constexpr error_retry_interval
auto constexpr default_refresh_interval
T size(T... args)
T str(T... args)
std::shared_ptr< Resource > startingResource
the resource to request at <timer> intervals.
clock_type::time_point nextRefresh
std::chrono::minutes refreshInterval
std::shared_ptr< Resource > loadedResource
the original uri as loaded from config
std::optional< std::uint16_t > port
T substr(T... args)
T to_string(T... args)
T what(T... args)