rippled
Loading...
Searching...
No Matches
TimeoutCounter.cpp
1#include <xrpld/app/ledger/detail/TimeoutCounter.h>
2
3#include <xrpl/core/JobQueue.h>
4
5namespace xrpl {
6
7using namespace std::chrono_literals;
8
10 Application& app,
11 uint256 const& hash,
13 QueueJobParameter&& jobParameter,
14 beast::Journal journal)
15 : app_(app)
16 , journal_(journal)
17 , hash_(hash)
18 , timeouts_(0)
19 , complete_(false)
20 , failed_(false)
21 , progress_(false)
22 , timerInterval_(interval)
23 , queueJobParameter_(std::move(jobParameter))
24 , timer_(app_.getIOContext())
25{
26 XRPL_ASSERT(
27 (timerInterval_ > 10ms) && (timerInterval_ < 30s),
28 "xrpl::TimeoutCounter::TimeoutCounter : interval input inside range");
29}
30
31void
33{
34 if (isDone())
35 return;
36 timer_.expires_after(timerInterval_);
37 timer_.async_wait(
38 [wptr = pmDowncast()](boost::system::error_code const& ec) {
39 if (ec == boost::asio::error::operation_aborted)
40 return;
41
42 if (auto ptr = wptr.lock())
43 {
44 ScopedLockType sl(ptr->mtx_);
45 ptr->queueJob(sl);
46 }
47 });
48}
49
50void
52{
53 if (isDone())
54 return;
58 {
59 JLOG(journal_.debug()) << "Deferring " << queueJobParameter_.jobName
60 << " timer due to load";
61 setTimer(sl);
62 return;
63 }
64
68 [wptr = pmDowncast()]() {
69 if (auto sptr = wptr.lock(); sptr)
70 sptr->invokeOnTimer();
71 });
72}
73
74void
76{
78
79 if (isDone())
80 return;
81
82 if (!progress_)
83 {
84 ++timeouts_;
85 JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
86 << " acquiring " << hash_;
87 onTimer(false, sl);
88 }
89 else
90 {
91 progress_ = false;
92 onTimer(true, sl);
93 }
94
95 if (!isDone())
96 setTimer(sl);
97}
98
99void
101{
103 if (!isDone())
104 {
105 failed_ = true;
106 JLOG(journal_.info()) << "Cancel " << hash_;
107 }
108}
109
110} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
virtual JobQueue & getJobQueue()=0
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:131
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:148
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
std::recursive_mutex mtx_
virtual std::weak_ptr< TimeoutCounter > pmDowncast()=0
Return a weak pointer to this.
virtual void onTimer(bool progress, ScopedLockType &)=0
Hook called from invokeOnTimer().
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
bool progress_
Whether forward progress has been made.
beast::Journal journal_
QueueJobParameter queueJobParameter_
void invokeOnTimer()
Calls onTimer() if in the right state.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
virtual void cancel()
Cancel the task by marking it as failed if the task is not done.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::optional< std::uint32_t > jobLimit