rippled
Loading...
Searching...
No Matches
ClosureCounter.h
1#pragma once
2
3#include <xrpl/basics/Log.h>
4
5#include <atomic>
6#include <condition_variable>
7#include <mutex>
8#include <optional>
9
10namespace xrpl {
11
33template <typename Ret_t, typename... Args_t>
35{
36private:
37 std::mutex mutable mutex_{};
39 bool waitForClosures_{false}; // guard with mutex_
41
42 // Increment the count.
45 {
47 return *this;
48 }
49
50 // Decrement the count. If we're stopping and the count drops to zero
51 // notify allClosuresDoneCond_.
54 {
55 // Even though closureCount_ is atomic, we decrement its value under
56 // a lock. This removes a small timing window that occurs if the
57 // waiting thread is handling a spurious wakeup when closureCount_
58 // drops to zero.
59 std::lock_guard const lock{mutex_};
60
61 // Update closureCount_. Notify if stopping and closureCount_ == 0.
62 if ((--closureCount_ == 0) && waitForClosures_)
64 return *this;
65 }
66
67 // A private template class that helps count the number of closures
68 // in flight. This allows callers to block until all their postponed
69 // closures are dispatched.
70 template <typename Closure>
72 {
73 private:
76
77 static_assert(
79 "Closure arguments don't match ClosureCounter Ret_t or Args_t");
80
81 public:
82 Substitute() = delete;
83
85 {
86 ++counter_;
87 }
88
94
96 ClosureCounter& counter,
97 Closure&& closure) // NOLINT(cppcoreguidelines-rvalue-reference-param-not-moved)
98 : counter_(counter), closure_(std::forward<Closure>(closure))
99 {
100 ++counter_;
101 }
102
104 operator=(Substitute const& rhs) = delete;
106 operator=(Substitute&& rhs) = delete;
107
109 {
110 --counter_;
111 }
112
113 // Note that Args_t is not deduced, it is explicit. So Args_t&&
114 // would be an rvalue reference, not a forwarding reference. We
115 // want to forward exactly what the user declared.
116 Ret_t
117 operator()(Args_t... args)
118 {
119 return closure_(std::forward<Args_t>(args)...);
120 }
121 };
122
123public:
124 ClosureCounter() = default;
125 // Not copyable or movable. Outstanding counts would be hard to sort out.
127
129 operator=(ClosureCounter const&) = delete;
130
133 {
134 using namespace std::chrono_literals;
135 join("ClosureCounter", 1s, debugLog());
136 }
137
144 void
146 {
148 waitForClosures_ = true;
149 if (closureCount_ > 0)
150 {
151 if (!allClosuresDoneCond_.wait_for(lock, wait, [this] { return closureCount_ == 0; }))
152 {
153 if (auto stream = j.error())
154 stream << name << " waiting for ClosureCounter::join().";
155 allClosuresDoneCond_.wait(lock, [this] { return closureCount_ == 0; });
156 }
157 }
158 }
159
167 template <class Closure>
169 wrap(Closure&& closure)
170 {
172
173 std::lock_guard const lock{mutex_};
174 if (!waitForClosures_)
175 ret.emplace(*this, std::forward<Closure>(closure));
176
177 return ret;
178 }
179
181 int
182 count() const
183 {
184 return closureCount_;
185 }
186
193 bool
194 joined() const
195 {
196 std::lock_guard const lock{mutex_};
197 return waitForClosures_;
198 }
199};
200
201} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Substitute(ClosureCounter &counter, Closure &&closure)
std::remove_reference_t< Closure > closure_
Ret_t operator()(Args_t... args)
Substitute(Substitute const &rhs)
Substitute & operator=(Substitute const &rhs)=delete
Substitute(Substitute &&rhs) noexcept(std::is_nothrow_move_constructible< Closure >::value)
Substitute & operator=(Substitute &&rhs)=delete
The role of a ClosureCounter is to assist in shutdown by letting callers wait for the completion of c...
~ClosureCounter()
Destructor verifies all in-flight closures are complete.
bool joined() const
Returns true if this has been joined.
ClosureCounter(ClosureCounter const &)=delete
ClosureCounter & operator--()
std::atomic< int > closureCount_
ClosureCounter()=default
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
ClosureCounter & operator++()
int count() const
Current number of Closures outstanding.
ClosureCounter & operator=(ClosureCounter const &)=delete
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
std::condition_variable allClosuresDoneCond_
T emplace(T... args)
T is_same_v
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
beast::Journal debugLog()
Returns a debug journal.
Definition Log.cpp:453