rippled
Loading...
Searching...
No Matches
ClosureCounter.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4
5#include <atomic>
6#include <condition_variable>
7#include <mutex>
8#include <optional>
9
10namespace xrpl {
11
33template <typename Ret_t, typename... Args_t>
35{
36private:
37 std::mutex mutable mutex_{};
39 bool waitForClosures_{false}; // guard with mutex_
41
42 // Increment the count.
45 {
47 return *this;
48 }
49
50 // Decrement the count. If we're stopping and the count drops to zero
51 // notify allClosuresDoneCond_.
54 {
55 // Even though closureCount_ is atomic, we decrement its value under
56 // a lock. This removes a small timing window that occurs if the
57 // waiting thread is handling a spurious wakeup when closureCount_
58 // drops to zero.
60
61 // Update closureCount_. Notify if stopping and closureCount_ == 0.
62 if ((--closureCount_ == 0) && waitForClosures_)
64 return *this;
65 }
66
67 // A private template class that helps count the number of closures
68 // in flight. This allows callers to block until all their postponed
69 // closures are dispatched.
70 template <typename Closure>
72 {
73 private:
76
77 static_assert(
79 "Closure arguments don't match ClosureCounter Ret_t or Args_t");
80
81 public:
82 Substitute() = delete;
83
85 {
86 ++counter_;
87 }
88
94
95 Substitute(ClosureCounter& counter, Closure&& closure)
96 : counter_(counter), closure_(std::forward<Closure>(closure))
97 {
98 ++counter_;
99 }
100
102 operator=(Substitute const& rhs) = delete;
104 operator=(Substitute&& rhs) = delete;
105
107 {
108 --counter_;
109 }
110
111 // Note that Args_t is not deduced, it is explicit. So Args_t&&
112 // would be an rvalue reference, not a forwarding reference. We
113 // want to forward exactly what the user declared.
114 Ret_t
115 operator()(Args_t... args)
116 {
117 return closure_(std::forward<Args_t>(args)...);
118 }
119 };
120
121public:
122 ClosureCounter() = default;
123 // Not copyable or movable. Outstanding counts would be hard to sort out.
125
127 operator=(ClosureCounter const&) = delete;
128
131 {
132 using namespace std::chrono_literals;
133 join("ClosureCounter", 1s, debugLog());
134 }
135
142 void
144 {
146 waitForClosures_ = true;
147 if (closureCount_ > 0)
148 {
149 if (!allClosuresDoneCond_.wait_for(lock, wait, [this] { return closureCount_ == 0; }))
150 {
151 if (auto stream = j.error())
152 stream << name << " waiting for ClosureCounter::join().";
153 allClosuresDoneCond_.wait(lock, [this] { return closureCount_ == 0; });
154 }
155 }
156 }
157
165 template <class Closure>
167 wrap(Closure&& closure)
168 {
170
172 if (!waitForClosures_)
173 ret.emplace(*this, std::forward<Closure>(closure));
174
175 return ret;
176 }
177
179 int
180 count() const
181 {
182 return closureCount_;
183 }
184
191 bool
192 joined() const
193 {
195 return waitForClosures_;
196 }
197};
198
199} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:318
Substitute(ClosureCounter &counter, Closure &&closure)
std::remove_reference_t< Closure > closure_
Ret_t operator()(Args_t... args)
Substitute(Substitute const &rhs)
Substitute & operator=(Substitute const &rhs)=delete
Substitute(Substitute &&rhs) noexcept(std::is_nothrow_move_constructible< Closure >::value)
Substitute & operator=(Substitute &&rhs)=delete
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
~ClosureCounter()
Destructor verifies all in-flight closures are complete.
bool joined() const
Returns true if this has been joined.
ClosureCounter(ClosureCounter const &)=delete
ClosureCounter & operator--()
std::atomic< int > closureCount_
ClosureCounter()=default
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
ClosureCounter & operator++()
int count() const
Current number of Closures outstanding.
ClosureCounter & operator=(ClosureCounter const &)=delete
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
std::condition_variable allClosuresDoneCond_
T emplace(T... args)
T is_same_v
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
beast::Journal debugLog()
Returns a debug journal.
Definition Log.cpp:445