rippled
Loading...
Searching...
No Matches
TimeoutCounter.cpp
1#include <xrpld/app/ledger/detail/TimeoutCounter.h>
2
3#include <xrpl/core/JobQueue.h>
4
5namespace xrpl {
6
7using namespace std::chrono_literals;
8
10 Application& app,
11 uint256 const& hash,
13 QueueJobParameter&& jobParameter,
14 beast::Journal journal)
15 : app_(app)
16 , journal_(journal)
17 , hash_(hash)
18 , timerInterval_(interval)
19 , queueJobParameter_(std::move(jobParameter))
20 , timer_(app_.getIOContext())
21{
22 XRPL_ASSERT(
23 (timerInterval_ > 10ms) && (timerInterval_ < 30s),
24 "xrpl::TimeoutCounter::TimeoutCounter : interval input inside range");
25}
26
27void
29{
30 if (isDone())
31 return;
32 timer_.expires_after(timerInterval_);
33 timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
34 if (ec == boost::asio::error::operation_aborted)
35 return;
36
37 if (auto ptr = wptr.lock())
38 {
39 ScopedLockType sl(ptr->mtx_);
40 ptr->queueJob(sl);
41 }
42 });
43}
44
45void
47{
48 if (isDone())
49 return;
53 {
54 JLOG(journal_.debug()) << "Deferring " << queueJobParameter_.jobName
55 << " timer due to load";
56 setTimer(sl);
57 return;
58 }
59
62 if (auto sptr = wptr.lock(); sptr)
63 sptr->invokeOnTimer();
64 });
65}
66
67void
69{
71
72 if (isDone())
73 return;
74
75 if (!progress_)
76 {
77 ++timeouts_;
78 JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
79 << " acquiring " << hash_;
80 onTimer(false, sl);
81 }
82 else
83 {
84 progress_ = false;
85 onTimer(true, sl);
86 }
87
88 if (!isDone())
89 setTimer(sl);
90}
91
92void
94{
95 ScopedLockType const sl(mtx_);
96 if (!isDone())
97 {
98 failed_ = true;
99 JLOG(journal_.info()) << "Cancel " << hash_;
100 }
101}
102
103} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:115
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:147
virtual JobQueue & getJobQueue()=0
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
std::recursive_mutex mtx_
virtual std::weak_ptr< TimeoutCounter > pmDowncast()=0
Return a weak pointer to this.
virtual void onTimer(bool progress, ScopedLockType &)=0
Hook called from invokeOnTimer().
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
QueueJobParameter queueJobParameter_
void invokeOnTimer()
Calls onTimer() if in the right state.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
virtual void cancel()
Cancel the task by marking it as failed if the task is not done.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::optional< std::uint32_t > jobLimit