xrpld
Loading...
Searching...
No Matches
TimeoutCounter.cpp
1#include <xrpld/app/ledger/detail/TimeoutCounter.h>
2
3#include <xrpld/app/main/Application.h>
4
5#include <xrpl/basics/Log.h>
6#include <xrpl/basics/base_uint.h>
7#include <xrpl/beast/utility/Journal.h>
8#include <xrpl/beast/utility/instrumentation.h>
9#include <xrpl/core/JobQueue.h>
10
11#include <boost/asio/error.hpp>
12#include <boost/system/detail/error_code.hpp>
13
14#include <chrono>
15#include <utility>
16
17namespace xrpl {
18
19using namespace std::chrono_literals;
20
22 Application& app,
23 uint256 const& hash,
25 QueueJobParameter&& jobParameter,
26 beast::Journal journal)
27 : app_(app)
28 , journal_(journal)
29 , hash_(hash)
30 , timerInterval_(interval)
31 , queueJobParameter_(std::move(jobParameter))
32 , timer_(app_.getIOContext())
33{
34 XRPL_ASSERT(
35 (timerInterval_ > 10ms) && (timerInterval_ < 30s),
36 "xrpl::TimeoutCounter::TimeoutCounter : interval input inside range");
37}
38
39void
41{
42 if (isDone())
43 return;
44 timer_.expires_after(timerInterval_);
45 timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
46 if (ec == boost::asio::error::operation_aborted)
47 return;
48
49 if (auto ptr = wptr.lock())
50 {
51 ScopedLockType sl(ptr->mtx_);
52 ptr->queueJob(sl);
53 }
54 });
55}
56
57void
59{
60 if (isDone())
61 return;
62 if (queueJobParameter_.jobLimit &&
63 app_.getJobQueue().getJobCountTotal(queueJobParameter_.jobType) >=
64 queueJobParameter_.jobLimit)
65 {
66 JLOG(journal_.debug()) << "Deferring " << queueJobParameter_.jobName
67 << " timer due to load";
68 setTimer(sl);
69 return;
70 }
71
72 app_.getJobQueue().addJob(
73 queueJobParameter_.jobType, queueJobParameter_.jobName, [wptr = pmDowncast()]() {
74 if (auto sptr = wptr.lock(); sptr)
75 sptr->invokeOnTimer();
76 });
77}
78
79void
81{
83
84 if (isDone())
85 return;
86
87 if (!progress_)
88 {
89 ++timeouts_;
90 JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
91 << " acquiring " << hash_;
92 onTimer(false, sl);
93 }
94 else
95 {
96 progress_ = false;
97 onTimer(true, sl);
98 }
99
100 if (!isDone())
101 setTimer(sl);
102}
103
104void
106{
107 ScopedLockType const sl(mtx_);
108 if (!isDone())
109 {
110 failed_ = true;
111 JLOG(journal_.info()) << "Cancel " << hash_;
112 }
113}
114
115} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:38
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
std::recursive_mutex mtx_
virtual std::weak_ptr< TimeoutCounter > pmDowncast()=0
Return a weak pointer to this.
virtual void onTimer(bool progress, ScopedLockType &)=0
Hook called from invokeOnTimer().
std::unique_lock< std::recursive_mutex > ScopedLockType
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
QueueJobParameter queueJobParameter_
void invokeOnTimer()
Calls onTimer() if in the right state.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after timerInterval_.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
virtual void cancel()
Cancel the task by marking it as failed if the task is not done.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
BaseUInt< 256 > uint256
Definition base_uint.h:562