rippled
Loading...
Searching...
No Matches
TimeoutCounter.cpp
1#include <xrpld/app/ledger/detail/TimeoutCounter.h>
2#include <xrpld/core/JobQueue.h>
3
4namespace ripple {
5
6using namespace std::chrono_literals;
7
9 Application& app,
10 uint256 const& hash,
12 QueueJobParameter&& jobParameter,
13 beast::Journal journal)
14 : app_(app)
15 , journal_(journal)
16 , hash_(hash)
17 , timeouts_(0)
18 , complete_(false)
19 , failed_(false)
20 , progress_(false)
21 , timerInterval_(interval)
22 , queueJobParameter_(std::move(jobParameter))
23 , timer_(app_.getIOContext())
24{
25 XRPL_ASSERT(
26 (timerInterval_ > 10ms) && (timerInterval_ < 30s),
27 "ripple::TimeoutCounter::TimeoutCounter : interval input inside range");
28}
29
30void
32{
33 if (isDone())
34 return;
35 timer_.expires_after(timerInterval_);
36 timer_.async_wait(
37 [wptr = pmDowncast()](boost::system::error_code const& ec) {
38 if (ec == boost::asio::error::operation_aborted)
39 return;
40
41 if (auto ptr = wptr.lock())
42 {
43 ScopedLockType sl(ptr->mtx_);
44 ptr->queueJob(sl);
45 }
46 });
47}
48
49void
51{
52 if (isDone())
53 return;
57 {
58 JLOG(journal_.debug()) << "Deferring " << queueJobParameter_.jobName
59 << " timer due to load";
60 setTimer(sl);
61 return;
62 }
63
67 [wptr = pmDowncast()]() {
68 if (auto sptr = wptr.lock(); sptr)
69 sptr->invokeOnTimer();
70 });
71}
72
73void
75{
77
78 if (isDone())
79 return;
80
81 if (!progress_)
82 {
83 ++timeouts_;
84 JLOG(journal_.debug()) << "Timeout(" << timeouts_ << ") "
85 << " acquiring " << hash_;
86 onTimer(false, sl);
87 }
88 else
89 {
90 progress_ = false;
91 onTimer(true, sl);
92 }
93
94 if (!isDone())
95 setTimer(sl);
96}
97
98void
100{
102 if (!isDone())
103 {
104 failed_ = true;
105 JLOG(journal_.info()) << "Cancel " << hash_;
106 }
107}
108
109} // namespace ripple
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
virtual JobQueue & getJobQueue()=0
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:133
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:149
void queueJob(ScopedLockType &)
Queue a job to call invokeOnTimer().
virtual void onTimer(bool progress, ScopedLockType &)=0
Hook called from invokeOnTimer().
bool progress_
Whether forward progress has been made.
void setTimer(ScopedLockType &)
Schedule a call to queueJob() after mTimerInterval.
void invokeOnTimer()
Calls onTimer() if in the right state.
std::chrono::milliseconds timerInterval_
The minimum time to wait between calls to execute().
QueueJobParameter queueJobParameter_
uint256 const hash_
The hash of the object (in practice, always a ledger) we are trying to fetch.
boost::asio::basic_waitable_timer< std::chrono::steady_clock > timer_
virtual std::weak_ptr< TimeoutCounter > pmDowncast()=0
Return a weak pointer to this.
TimeoutCounter(Application &app, uint256 const &targetHash, std::chrono::milliseconds timeoutInterval, QueueJobParameter &&jobParameter, beast::Journal journal)
virtual void cancel()
Cancel the task by marking it as failed if the task is not done.
std::recursive_mutex mtx_
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
STL namespace.
std::optional< std::uint32_t > jobLimit