rippled
Loading...
Searching...
No Matches
ClosureCounter.h
1#ifndef XRPL_CORE_CLOSURE_COUNTER_H_INCLUDED
2#define XRPL_CORE_CLOSURE_COUNTER_H_INCLUDED
3
4#include <xrpl/basics/Log.h>
5
6#include <atomic>
7#include <condition_variable>
8#include <mutex>
9#include <optional>
10
11namespace ripple {
12
34template <typename Ret_t, typename... Args_t>
36{
37private:
38 std::mutex mutable mutex_{};
40 bool waitForClosures_{false}; // guard with mutex_
42
43 // Increment the count.
46 {
48 return *this;
49 }
50
51 // Decrement the count. If we're stopping and the count drops to zero
52 // notify allClosuresDoneCond_.
55 {
56 // Even though closureCount_ is atomic, we decrement its value under
57 // a lock. This removes a small timing window that occurs if the
58 // waiting thread is handling a spurious wakeup when closureCount_
59 // drops to zero.
61
62 // Update closureCount_. Notify if stopping and closureCount_ == 0.
63 if ((--closureCount_ == 0) && waitForClosures_)
65 return *this;
66 }
67
68 // A private template class that helps count the number of closures
69 // in flight. This allows callers to block until all their postponed
70 // closures are dispatched.
71 template <typename Closure>
73 {
74 private:
77
78 static_assert(
80 value,
81 "Closure arguments don't match ClosureCounter Ret_t or Args_t");
82
83 public:
84 Substitute() = delete;
85
88 {
89 ++counter_;
90 }
91
98
99 Substitute(ClosureCounter& counter, Closure&& closure)
100 : counter_(counter), closure_(std::forward<Closure>(closure))
101 {
102 ++counter_;
103 }
104
106 operator=(Substitute const& rhs) = delete;
108 operator=(Substitute&& rhs) = delete;
109
111 {
112 --counter_;
113 }
114
115 // Note that Args_t is not deduced, it is explicit. So Args_t&&
116 // would be an rvalue reference, not a forwarding reference. We
117 // want to forward exactly what the user declared.
118 Ret_t
119 operator()(Args_t... args)
120 {
121 return closure_(std::forward<Args_t>(args)...);
122 }
123 };
124
125public:
126 ClosureCounter() = default;
127 // Not copyable or movable. Outstanding counts would be hard to sort out.
129
131 operator=(ClosureCounter const&) = delete;
132
135 {
136 using namespace std::chrono_literals;
137 join("ClosureCounter", 1s, debugLog());
138 }
139
146 void
148 {
150 waitForClosures_ = true;
151 if (closureCount_ > 0)
152 {
154 lock, wait, [this] { return closureCount_ == 0; }))
155 {
156 if (auto stream = j.error())
157 stream << name << " waiting for ClosureCounter::join().";
159 lock, [this] { return closureCount_ == 0; });
160 }
161 }
162 }
163
171 template <class Closure>
173 wrap(Closure&& closure)
174 {
176
178 if (!waitForClosures_)
179 ret.emplace(*this, std::forward<Closure>(closure));
180
181 return ret;
182 }
183
185 int
186 count() const
187 {
188 return closureCount_;
189 }
190
197 bool
198 joined() const
199 {
201 return waitForClosures_;
202 }
203};
204
205} // namespace ripple
206
207#endif // XRPL_CORE_CLOSURE_COUNTER_H_INCLUDED
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Substitute(Substitute const &rhs)
Substitute(Substitute &&rhs) noexcept(std::is_nothrow_move_constructible< Closure >::value)
std::remove_reference_t< Closure > closure_
Substitute & operator=(Substitute const &rhs)=delete
Substitute & operator=(Substitute &&rhs)=delete
Substitute(ClosureCounter &counter, Closure &&closure)
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
bool joined() const
Returns true if this has been joined.
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
ClosureCounter & operator++()
ClosureCounter(ClosureCounter const &)=delete
std::atomic< int > closureCount_
ClosureCounter & operator--()
int count() const
Current number of Closures outstanding.
ClosureCounter & operator=(ClosureCounter const &)=delete
~ClosureCounter()
Destructor verifies all in-flight closures are complete.
std::condition_variable allClosuresDoneCond_
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
T emplace(T... args)
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
beast::Journal debugLog()
Returns a debug journal.
Definition Log.cpp:457
STL namespace.