rippled
Loading...
Searching...
No Matches
Database.cpp
1#include <xrpl/basics/chrono.h>
2#include <xrpl/beast/core/CurrentThreadName.h>
3#include <xrpl/json/json_value.h>
4#include <xrpl/nodestore/Database.h>
5#include <xrpl/protocol/HashPrefix.h>
6#include <xrpl/protocol/jss.h>
7
8#include <chrono>
9
10namespace ripple {
11namespace NodeStore {
12
14 Scheduler& scheduler,
15 int readThreads,
16 Section const& config,
17 beast::Journal journal)
18 : j_(journal)
19 , scheduler_(scheduler)
20 , earliestLedgerSeq_(
21 get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
22 , requestBundle_(get<int>(config, "rq_bundle", 4))
23 , readThreads_(std::max(1, readThreads))
24{
25 XRPL_ASSERT(
26 readThreads,
27 "ripple::NodeStore::Database::Database : nonzero threads input");
28
29 if (earliestLedgerSeq_ < 1)
30 Throw<std::runtime_error>("Invalid earliest_seq");
31
32 if (requestBundle_ < 1 || requestBundle_ > 64)
33 Throw<std::runtime_error>("Invalid rq_bundle");
34
35 for (int i = readThreads_.load(); i != 0; --i)
36 {
38 [this](int i) {
40
42 "db prefetch #" + std::to_string(i));
43
44 decltype(read_) read;
45
46 while (true)
47 {
48 {
50
51 if (isStopping())
52 break;
53
54 if (read_.empty())
55 {
57 readCondVar_.wait(lock);
59 }
60
61 if (isStopping())
62 break;
63
64 // extract multiple object at a time to minimize the
65 // overhead of acquiring the mutex.
66 for (int cnt = 0;
67 !read_.empty() && cnt != requestBundle_;
68 ++cnt)
69 read.insert(read_.extract(read_.begin()));
70 }
71
72 for (auto it = read.begin(); it != read.end(); ++it)
73 {
74 XRPL_ASSERT(
75 !it->second.empty(),
76 "ripple::NodeStore::Database::Database : non-empty "
77 "data");
78
79 auto const& hash = it->first;
80 auto const& data = it->second;
81 auto const seqn = data[0].first;
82
83 auto obj =
85
86 // This could be further optimized: if there are
87 // multiple requests for sequence numbers mapping to
88 // multiple databases by sorting requests such that all
89 // indices mapping to the same database are grouped
90 // together and serviced by a single read.
91 for (auto const& req : data)
92 {
93 req.second(
94 (seqn == req.first) || isSameDB(req.first, seqn)
95 ? obj
97 hash, req.first, FetchType::async));
98 }
99 }
100
101 read.clear();
102 }
103
105 --readThreads_;
106 },
107 i);
108 t.detach();
109 }
110}
111
113{
114 // NOTE!
115 // Any derived class should call the stop() method in its
116 // destructor. Otherwise, occasionally, the derived class may
117 // crash during shutdown when its members are accessed by one of
118 // these threads after the derived class is destroyed but before
119 // this base class is destroyed.
120 stop();
121}
122
123bool
128
129void
131{
132 {
134
136 {
137 JLOG(j_.debug()) << "Clearing read queue because of stop request";
138 read_.clear();
140 }
141 }
142
143 JLOG(j_.debug()) << "Waiting for stop request to complete...";
144
145 using namespace std::chrono;
146
147 auto const start = steady_clock::now();
148
149 while (readThreads_.load() != 0)
150 {
151 XRPL_ASSERT(
152 steady_clock::now() - start < 30s,
153 "ripple::NodeStore::Database::stop : maximum stop duration");
155 }
156
157 JLOG(j_.debug()) << "Stop request completed in "
158 << duration_cast<std::chrono::milliseconds>(
159 steady_clock::now() - start)
160 .count()
161 << " millseconds";
162}
163
164void
166 uint256 const& hash,
167 std::uint32_t ledgerSeq,
169{
171
172 if (!isStopping())
173 {
174 read_[hash].emplace_back(ledgerSeq, std::move(cb));
176 }
177}
178
179void
181{
182 Batch batch;
184 auto storeBatch = [&, fname = __func__]() {
185 try
186 {
187 dstBackend.storeBatch(batch);
188 }
189 catch (std::exception const& e)
190 {
191 JLOG(j_.error()) << "Exception caught in function " << fname
192 << ". Error: " << e.what();
193 return;
194 }
195
196 std::uint64_t sz{0};
197 for (auto const& nodeObject : batch)
198 sz += nodeObject->getData().size();
199 storeStats(batch.size(), sz);
200 batch.clear();
201 };
202
203 srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
204 XRPL_ASSERT(
205 nodeObject,
206 "ripple::NodeStore::Database::importInternal : non-null node");
207 if (!nodeObject) // This should never happen
208 return;
209
210 batch.emplace_back(std::move(nodeObject));
212 storeBatch();
213 });
214
215 if (!batch.empty())
216 storeBatch();
217}
218
219// Perform a fetch and report the time it took
222 uint256 const& hash,
223 std::uint32_t ledgerSeq,
224 FetchType fetchType,
225 bool duplicate)
226{
227 FetchReport fetchReport(fetchType);
228
229 using namespace std::chrono;
230 auto const begin{steady_clock::now()};
231
232 auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
233 auto dur = steady_clock::now() - begin;
234 fetchDurationUs_ += duration_cast<microseconds>(dur).count();
235 if (nodeObject)
236 {
238 fetchSz_ += nodeObject->getData().size();
239 }
241
242 fetchReport.elapsed = duration_cast<milliseconds>(dur);
243 scheduler_.onFetch(fetchReport);
244 return nodeObject;
245}
246
247void
249{
250 XRPL_ASSERT(
251 obj.isObject(),
252 "ripple::NodeStore::Database::getCountsJson : valid input type");
253
254 {
256 obj["read_queue"] = static_cast<Json::UInt>(read_.size());
257 }
258
259 obj["read_threads_total"] = readThreads_.load();
260 obj["read_threads_running"] = runningThreads_.load();
261 obj["read_request_bundle"] = requestBundle_;
262
263 obj[jss::node_writes] = std::to_string(storeCount_);
264 obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
265 obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
266 obj[jss::node_written_bytes] = std::to_string(storeSz_);
267 obj[jss::node_read_bytes] = std::to_string(fetchSz_);
268 obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
269}
270
271} // namespace NodeStore
272} // namespace ripple
Represents a JSON value.
Definition json_value.h:130
bool isObject() const
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
A backend used for the NodeStore.
Definition Backend.h:21
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
Persistency layer for NodeObject.
Definition Database.h:32
void getCountsJson(Json::Value &obj)
Definition Database.cpp:248
std::atomic< std::uint32_t > fetchSz_
Definition Database.h:213
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:229
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:165
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
virtual ~Database()
Destroy the node store.
Definition Database.cpp:112
std::condition_variable readCondVar_
Definition Database.h:258
std::atomic< std::uint64_t > storeCount_
Definition Database.h:251
std::uint32_t const earliestLedgerSeq_
Definition Database.h:221
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition Database.h:266
std::atomic< std::uint64_t > storeSz_
Definition Database.h:252
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:221
std::atomic< bool > readStopping_
Definition Database.h:268
std::atomic< std::uint32_t > fetchHitCount_
Definition Database.h:212
beast::Journal const j_
Definition Database.h:208
std::atomic< std::uint64_t > fetchDurationUs_
Definition Database.h:254
std::atomic< int > runningThreads_
Definition Database.h:270
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
std::atomic< std::uint64_t > fetchTotalCount_
Definition Database.h:253
std::atomic< int > readThreads_
Definition Database.h:269
void importInternal(Backend &dstBackend, Database &srcDB)
Definition Database.cpp:180
Scheduling for asynchronous backend activity.
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:26
T detach(T... args)
T exchange(T... args)
T is_same_v
T load(T... args)
unsigned int UInt
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void read(nudb::detail::istream &is, std::size_t &u)
Definition varint.h:102
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
STL namespace.
T reserve(T... args)
Contains information about a fetch operation.
T to_string(T... args)
T what(T... args)
T yield(T... args)