xrpld
Loading...
Searching...
No Matches
Database.cpp
1#include <xrpl/nodestore/Database.h>
2
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/base_uint.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/beast/utility/Journal.h>
8#include <xrpl/beast/utility/instrumentation.h>
9#include <xrpl/config/BasicConfig.h>
10#include <xrpl/config/Constants.h>
11#include <xrpl/json/json_forwards.h>
12#include <xrpl/json/json_value.h>
13#include <xrpl/nodestore/Backend.h>
14#include <xrpl/nodestore/NodeObject.h>
15#include <xrpl/nodestore/Scheduler.h>
16#include <xrpl/nodestore/Types.h>
17#include <xrpl/protocol/SystemParameters.h>
18#include <xrpl/protocol/jss.h>
19
20#include <algorithm>
21#include <atomic>
22#include <chrono>
23#include <cstdint>
24#include <exception>
25#include <functional>
26#include <memory>
27#include <mutex>
28#include <stdexcept>
29#include <string>
30#include <thread>
31#include <utility>
32
33namespace xrpl::NodeStore {
34
36 Scheduler& scheduler,
37 int readThreads,
38 Section const& config,
39 beast::Journal journal)
40 : j_(journal)
41 , scheduler_(scheduler)
42 , earliestLedgerSeq_(get<std::uint32_t>(config, Keys::kEarliestSeq, kXrpLedgerEarliestSeq))
43 , requestBundle_(get<int>(config, Keys::kRqBundle, 4))
44 , readThreads_(std::max(1, readThreads))
45{
46 XRPL_ASSERT(readThreads, "xrpl::NodeStore::Database::Database : nonzero threads input");
47
48 if (earliestLedgerSeq_ < 1)
49 Throw<std::runtime_error>("Invalid earliest_seq");
50
52 Throw<std::runtime_error>("Invalid rq_bundle");
53
54 for (int i = readThreads_.load(); i != 0; --i)
55 {
57 [this](int i) {
59
60 beast::setCurrentThreadName("db prefetch #" + std::to_string(i));
61
62 decltype(read_) read;
63
64 while (true)
65 {
66 {
68
69 if (isStopping())
70 break;
71
72 if (read_.empty())
73 {
75 readCondVar_.wait(lock);
77 }
78
79 if (isStopping())
80 break;
81
82 // extract multiple object at a time to minimize the
83 // overhead of acquiring the mutex.
84 for (int cnt = 0; !read_.empty() && cnt != requestBundle_; ++cnt)
85 read.insert(read_.extract(read_.begin()));
86 }
87
88 for (auto it = read.begin(); it != read.end(); ++it)
89 {
90 XRPL_ASSERT(
91 !it->second.empty(),
92 "xrpl::NodeStore::Database::Database : non-empty "
93 "data");
94
95 auto const& hash = it->first;
96 auto const& data = it->second;
97 auto const seqn = data[0].first;
98
99 auto obj = fetchNodeObject(hash, seqn, FetchType::Async);
100
101 // This could be further optimized: if there are
102 // multiple requests for sequence numbers mapping to
103 // multiple databases by sorting requests such that all
104 // indices mapping to the same database are grouped
105 // together and serviced by a single read.
106 for (auto const& req : data)
107 {
108 req.second(
109 (seqn == req.first) || isSameDB(req.first, seqn)
110 ? obj
111 : fetchNodeObject(hash, req.first, FetchType::Async));
112 }
113 }
114
115 read.clear();
116 }
117
119 --readThreads_;
120 },
121 i);
122 t.detach();
123 }
124}
125
127{
128 // NOTE!
129 // Any derived class should call the stop() method in its
130 // destructor. Otherwise, occasionally, the derived class may
131 // crash during shutdown when its members are accessed by one of
132 // these threads after the derived class is destroyed but before
133 // this base class is destroyed.
134 stop();
135}
136
137bool
139{
140 return readStopping_.load(std::memory_order_relaxed);
141}
142
143void
145{
146 {
147 std::scoped_lock const lock(readLock_);
148
149 if (!readStopping_.exchange(true, std::memory_order_relaxed))
150 {
151 JLOG(j_.debug()) << "Clearing read queue because of stop request";
152 read_.clear();
153 readCondVar_.notify_all();
154 }
155 }
156
157 JLOG(j_.debug()) << "Waiting for stop request to complete...";
158
159 using namespace std::chrono;
160
161 auto const start = steady_clock::now();
162
163 while (readThreads_.load() != 0)
164 {
165 XRPL_ASSERT(
166 steady_clock::now() - start < 30s,
167 "xrpl::NodeStore::Database::stop : maximum stop duration");
169 }
170
171 JLOG(j_.debug())
172 << "Stop request completed in "
174 << " milliseconds";
175}
176
177void
179 uint256 const& hash,
180 std::uint32_t ledgerSeq,
182{
184
185 if (!isStopping())
186 {
187 read_[hash].emplace_back(ledgerSeq, std::move(cb));
188 readCondVar_.notify_one();
189 }
190}
191
192void
194{
195 Batch batch;
197 auto storeBatch = [&, fname = __func__]() {
198 try
199 {
200 dstBackend.storeBatch(batch);
201 }
202 catch (std::exception const& e)
203 {
204 JLOG(j_.error()) << "Exception caught in function " << fname << ". Error: " << e.what();
205 return;
206 }
207
208 std::uint64_t sz{0};
209 for (auto const& nodeObject : batch)
210 sz += nodeObject->getData().size();
211 storeStats(batch.size(), sz);
212 batch.clear();
213 };
214
215 srcDB.forEach([&](std::shared_ptr<NodeObject> nodeObject) {
216 XRPL_ASSERT(nodeObject, "xrpl::NodeStore::Database::importInternal : non-null node");
217 if (!nodeObject) // This should never happen
218 return;
219
220 batch.emplace_back(std::move(nodeObject));
221 if (batch.size() >= kBatchWritePreallocationSize)
222 storeBatch();
223 });
224
225 if (!batch.empty())
226 storeBatch();
227}
228
229// Perform a fetch and report the time it took
232 uint256 const& hash,
233 std::uint32_t ledgerSeq,
234 FetchType fetchType,
235 bool duplicate)
236{
237 FetchReport fetchReport(fetchType);
238
239 using namespace std::chrono;
240 auto const begin{steady_clock::now()};
241
242 auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
243 auto dur = steady_clock::now() - begin;
245 if (nodeObject)
246 {
248 fetchSz_ += nodeObject->getData().size();
249 }
251
252 fetchReport.elapsed = duration_cast<milliseconds>(dur);
253 scheduler_.onFetch(fetchReport);
254 return nodeObject;
255}
256
257void
259{
260 XRPL_ASSERT(obj.isObject(), "xrpl::NodeStore::Database::getCountsJson : valid input type");
261
262 {
264 obj["read_queue"] = static_cast<json::UInt>(read_.size());
265 }
266
267 obj["read_threads_total"] = readThreads_.load();
268 obj["read_threads_running"] = runningThreads_.load();
269 obj["read_request_bundle"] = requestBundle_;
270
271 obj[jss::node_writes] = std::to_string(storeCount_);
272 obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
273 obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
274 obj[jss::node_written_bytes] = std::to_string(storeSz_);
275 obj[jss::node_read_bytes] = std::to_string(fetchSz_);
276 obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
277}
278
279} // namespace xrpl::NodeStore
T begin(T... args)
A generic endpoint for log messages.
Definition Journal.h:38
Represents a JSON value.
Definition json_value.h:130
bool isObject() const
A backend used for the NodeStore.
Definition Backend.h:19
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:221
std::condition_variable readCondVar_
Definition Database.h:248
std::atomic< std::uint64_t > fetchTotalCount_
Definition Database.h:243
std::atomic< int > readThreads_
Definition Database.h:258
std::atomic< int > runningThreads_
Definition Database.h:259
beast::Journal const j_
Definition Database.h:200
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition Database.h:255
std::atomic< bool > readStopping_
Definition Database.h:257
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:178
std::atomic< std::uint64_t > fetchDurationUs_
Definition Database.h:244
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
virtual ~Database()
Destroy the node store.
Definition Database.cpp:126
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::Synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:231
std::atomic< std::uint64_t > storeSz_
Definition Database.h:242
std::uint32_t const earliestLedgerSeq_
Definition Database.h:213
std::atomic< std::uint32_t > fetchHitCount_
Definition Database.h:204
std::atomic< std::uint64_t > storeCount_
Definition Database.h:241
void getCountsJson(json::Value &obj)
Definition Database.cpp:258
virtual void forEach(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
std::atomic< std::uint32_t > fetchSz_
Definition Database.h:205
void importInternal(Backend &dstBackend, Database &srcDB)
Definition Database.cpp:193
Scheduling for asynchronous backend activity.
Holds a collection of configuration values.
Definition BasicConfig.h:24
T detach(T... args)
T duration_cast(T... args)
T emplace_back(T... args)
T empty(T... args)
T lock(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
unsigned int UInt
STL namespace.
void read(nudb::detail::istream &is, std::size_t &u)
Definition varint.h:104
static constexpr auto kBatchWritePreallocationSize
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
static constexpr std::uint32_t kXrpLedgerEarliestSeq
The XRP ledger network's earliest allowed sequence.
BaseUInt< 256 > uint256
Definition base_uint.h:562
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T reserve(T... args)
T size(T... args)
Contains information about a fetch operation.
T to_string(T... args)
T what(T... args)
T yield(T... args)