rippled
Loading...
Searching...
No Matches
NuDBFactory.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/beast/core/LexicalCast.h>
3#include <xrpl/beast/utility/instrumentation.h>
4#include <xrpl/nodestore/Factory.h>
5#include <xrpl/nodestore/Manager.h>
6#include <xrpl/nodestore/detail/DecodedBlob.h>
7#include <xrpl/nodestore/detail/EncodedBlob.h>
8#include <xrpl/nodestore/detail/codec.h>
9
10#include <boost/filesystem.hpp>
11
12#include <nudb/nudb.hpp>
13
14#include <chrono>
15#include <cstdint>
16#include <cstdio>
17#include <exception>
18#include <memory>
19
20namespace ripple {
21namespace NodeStore {
22
23class NuDBBackend : public Backend
24{
25public:
26 // "appnum" is an application-defined constant stored in the header of a
27 // NuDB database. We used it to identify shard databases before that code
28 // was removed. For now, its only use is a sanity check that the database
29 // was created by xrpld.
30 static constexpr std::uint64_t appnum = 1;
31
33 size_t const keyBytes_;
37 nudb::store db_;
40
42 size_t keyBytes,
43 Section const& keyValues,
45 Scheduler& scheduler,
46 beast::Journal journal)
47 : j_(journal)
48 , keyBytes_(keyBytes)
50 , name_(get(keyValues, "path"))
51 , blockSize_(parseBlockSize(name_, keyValues, journal))
52 , deletePath_(false)
53 , scheduler_(scheduler)
54 {
55 if (name_.empty())
56 Throw<std::runtime_error>(
57 "nodestore: Missing path in NuDB backend");
58 }
59
61 size_t keyBytes,
62 Section const& keyValues,
64 Scheduler& scheduler,
65 nudb::context& context,
66 beast::Journal journal)
67 : j_(journal)
68 , keyBytes_(keyBytes)
70 , name_(get(keyValues, "path"))
71 , blockSize_(parseBlockSize(name_, keyValues, journal))
72 , db_(context)
73 , deletePath_(false)
74 , scheduler_(scheduler)
75 {
76 if (name_.empty())
77 Throw<std::runtime_error>(
78 "nodestore: Missing path in NuDB backend");
79 }
80
81 ~NuDBBackend() override
82 {
83 try
84 {
85 // close can throw and we don't want the destructor to throw.
86 close();
87 }
88 catch (nudb::system_error const&)
89 {
90 // Don't allow exceptions to propagate out of destructors.
91 // close() has already logged the error.
92 }
93 }
94
96 getName() override
97 {
98 return name_;
99 }
100
102 getBlockSize() const override
103 {
104 return blockSize_;
105 }
106
107 void
108 open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt)
109 override
110 {
111 using namespace boost::filesystem;
112 if (db_.is_open())
113 {
114 // LCOV_EXCL_START
115 UNREACHABLE(
116 "ripple::NodeStore::NuDBBackend::open : database is already "
117 "open");
118 JLOG(j_.error()) << "database is already open";
119 return;
120 // LCOV_EXCL_STOP
121 }
122 auto const folder = path(name_);
123 auto const dp = (folder / "nudb.dat").string();
124 auto const kp = (folder / "nudb.key").string();
125 auto const lp = (folder / "nudb.log").string();
126 nudb::error_code ec;
127 if (createIfMissing)
128 {
129 create_directories(folder);
130 nudb::create<nudb::xxhasher>(
131 dp,
132 kp,
133 lp,
134 appType,
135 uid,
136 salt,
137 keyBytes_,
139 0.50,
140 ec);
141 if (ec == nudb::errc::file_exists)
142 ec = {};
143 if (ec)
144 Throw<nudb::system_error>(ec);
145 }
146 db_.open(dp, kp, lp, ec);
147 if (ec)
148 Throw<nudb::system_error>(ec);
149
150 if (db_.appnum() != appnum)
151 Throw<std::runtime_error>("nodestore: unknown appnum");
152 db_.set_burst(burstSize_);
153 }
154
155 bool
156 isOpen() override
157 {
158 return db_.is_open();
159 }
160
161 void
162 open(bool createIfMissing) override
163 {
164 open(createIfMissing, appnum, nudb::make_uid(), nudb::make_salt());
165 }
166
167 void
168 close() override
169 {
170 if (db_.is_open())
171 {
172 nudb::error_code ec;
173 db_.close(ec);
174 if (ec)
175 {
176 // Log to make sure the nature of the error gets to the user.
177 JLOG(j_.fatal()) << "NuBD close() failed: " << ec.message();
178 Throw<nudb::system_error>(ec);
179 }
180
181 if (deletePath_)
182 {
183 boost::filesystem::remove_all(name_, ec);
184 if (ec)
185 {
186 JLOG(j_.fatal()) << "Filesystem remove_all of " << name_
187 << " failed with: " << ec.message();
188 }
189 }
190 }
191 }
192
193 Status
194 fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
195 {
196 Status status;
197 pno->reset();
198 nudb::error_code ec;
199 db_.fetch(
200 key,
201 [key, pno, &status](void const* data, std::size_t size) {
202 nudb::detail::buffer bf;
203 auto const result = nodeobject_decompress(data, size, bf);
204 DecodedBlob decoded(key, result.first, result.second);
205 if (!decoded.wasOk())
206 {
207 status = dataCorrupt;
208 return;
209 }
210 *pno = decoded.createObject();
211 status = ok;
212 },
213 ec);
214 if (ec == nudb::error::key_not_found)
215 return notFound;
216 if (ec)
217 Throw<nudb::system_error>(ec);
218 return status;
219 }
220
223 {
225 results.reserve(hashes.size());
226 for (auto const& h : hashes)
227 {
229 Status status = fetch(h->begin(), &nObj);
230 if (status != ok)
231 results.push_back({});
232 else
233 results.push_back(nObj);
234 }
235
236 return {results, ok};
237 }
238
239 void
241 {
242 EncodedBlob e(no);
243 nudb::error_code ec;
244 nudb::detail::buffer bf;
245 auto const result = nodeobject_compress(e.getData(), e.getSize(), bf);
246 db_.insert(e.getKey(), result.first, result.second, ec);
247 if (ec && ec != nudb::error::key_exists)
248 Throw<nudb::system_error>(ec);
249 }
250
251 void
253 {
254 BatchWriteReport report;
255 report.writeCount = 1;
256 auto const start = std::chrono::steady_clock::now();
257 do_insert(no);
258 report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
260 scheduler_.onBatchWrite(report);
261 }
262
263 void
264 storeBatch(Batch const& batch) override
265 {
266 BatchWriteReport report;
267 report.writeCount = batch.size();
268 auto const start = std::chrono::steady_clock::now();
269 for (auto const& e : batch)
270 do_insert(e);
271 report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
273 scheduler_.onBatchWrite(report);
274 }
275
276 void
277 sync() override
278 {
279 }
280
281 void
283 {
284 auto const dp = db_.dat_path();
285 auto const kp = db_.key_path();
286 auto const lp = db_.log_path();
287 // auto const appnum = db_.appnum();
288 nudb::error_code ec;
289 db_.close(ec);
290 if (ec)
291 Throw<nudb::system_error>(ec);
292 nudb::visit(
293 dp,
294 [&](void const* key,
295 std::size_t key_bytes,
296 void const* data,
297 std::size_t size,
298 nudb::error_code&) {
299 nudb::detail::buffer bf;
300 auto const result = nodeobject_decompress(data, size, bf);
301 DecodedBlob decoded(key, result.first, result.second);
302 if (!decoded.wasOk())
303 {
304 ec = make_error_code(nudb::error::missing_value);
305 return;
306 }
307 f(decoded.createObject());
308 },
309 nudb::no_progress{},
310 ec);
311 if (ec)
312 Throw<nudb::system_error>(ec);
313 db_.open(dp, kp, lp, ec);
314 if (ec)
315 Throw<nudb::system_error>(ec);
316 }
317
318 int
319 getWriteLoad() override
320 {
321 return 0;
322 }
323
324 void
325 setDeletePath() override
326 {
327 deletePath_ = true;
328 }
329
330 void
331 verify() override
332 {
333 auto const dp = db_.dat_path();
334 auto const kp = db_.key_path();
335 auto const lp = db_.log_path();
336 nudb::error_code ec;
337 db_.close(ec);
338 if (ec)
339 Throw<nudb::system_error>(ec);
340 nudb::verify_info vi;
341 nudb::verify<nudb::xxhasher>(vi, dp, kp, 0, nudb::no_progress{}, ec);
342 if (ec)
343 Throw<nudb::system_error>(ec);
344 db_.open(dp, kp, lp, ec);
345 if (ec)
346 Throw<nudb::system_error>(ec);
347 }
348
349 int
350 fdRequired() const override
351 {
352 return 3;
353 }
354
355private:
356 static std::size_t
358 std::string const& name,
359 Section const& keyValues,
360 beast::Journal journal)
361 {
362 using namespace boost::filesystem;
363 auto const folder = path(name);
364 auto const kp = (folder / "nudb.key").string();
365
366 std::size_t const defaultSize =
367 nudb::block_size(kp); // Default 4K from NuDB
368 std::size_t blockSize = defaultSize;
369 std::string blockSizeStr;
370
371 if (!get_if_exists(keyValues, "nudb_block_size", blockSizeStr))
372 {
373 return blockSize; // Early return with default
374 }
375
376 try
377 {
378 std::size_t const parsedBlockSize =
379 beast::lexicalCastThrow<std::size_t>(blockSizeStr);
380
381 // Validate: must be power of 2 between 4K and 32K
382 if (parsedBlockSize < 4096 || parsedBlockSize > 32768 ||
383 (parsedBlockSize & (parsedBlockSize - 1)) != 0)
384 {
386 s << "Invalid nudb_block_size: " << parsedBlockSize
387 << ". Must be power of 2 between 4096 and 32768.";
388 Throw<std::runtime_error>(s.str());
389 }
390
391 JLOG(journal.info())
392 << "Using custom NuDB block size: " << parsedBlockSize
393 << " bytes";
394 return parsedBlockSize;
395 }
396 catch (std::exception const& e)
397 {
399 s << "Invalid nudb_block_size value: " << blockSizeStr
400 << ". Error: " << e.what();
401 Throw<std::runtime_error>(s.str());
402 }
403 }
404};
405
406//------------------------------------------------------------------------------
407
408class NuDBFactory : public Factory
409{
410private:
412
413public:
414 explicit NuDBFactory(Manager& manager) : manager_(manager)
415 {
416 manager_.insert(*this);
417 }
418
420 getName() const override
421 {
422 return "NuDB";
423 }
424
427 size_t keyBytes,
428 Section const& keyValues,
430 Scheduler& scheduler,
431 beast::Journal journal) override
432 {
434 keyBytes, keyValues, burstSize, scheduler, journal);
435 }
436
439 size_t keyBytes,
440 Section const& keyValues,
442 Scheduler& scheduler,
443 nudb::context& context,
444 beast::Journal journal) override
445 {
447 keyBytes, keyValues, burstSize, scheduler, context, journal);
448 }
449};
450
451void
453{
454 static NuDBFactory instance{manager};
455}
456
457} // namespace NodeStore
458} // namespace ripple
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream error() const
Definition Journal.h:327
Stream info() const
Definition Journal.h:315
A backend used for the NodeStore.
Definition Backend.h:21
Parsed key/value blob into NodeObject components.
Definition DecodedBlob.h:20
std::shared_ptr< NodeObject > createObject()
Create a NodeObject from this data.
bool wasOk() const noexcept
Determine if the decoding was successful.
Definition DecodedBlob.h:27
Convert a NodeObject from in-memory to database format.
Definition EncodedBlob.h:37
void const * getKey() const noexcept
Definition EncodedBlob.h:98
std::size_t getSize() const noexcept
void const * getData() const noexcept
Base class for backend factories.
Definition Factory.h:17
Singleton for managing NodeStore factories and back ends.
Definition Manager.h:13
virtual void insert(Factory &factory)=0
Add a factory.
void store(std::shared_ptr< NodeObject > const &no) override
Store a single object.
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)
Status fetch(void const *key, std::shared_ptr< NodeObject > *pno) override
Fetch a single object.
void open(bool createIfMissing) override
Open the backend.
std::pair< std::vector< std::shared_ptr< NodeObject > >, Status > fetchBatch(std::vector< uint256 const * > const &hashes) override
Fetch a batch synchronously.
static std::size_t parseBlockSize(std::string const &name, Section const &keyValues, beast::Journal journal)
static constexpr std::uint64_t appnum
void close() override
Close the backend.
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal)
void storeBatch(Batch const &batch) override
Store a group of objects.
void do_insert(std::shared_ptr< NodeObject > const &no)
std::optional< std::size_t > getBlockSize() const override
Get the block size for backends that support it.
int fdRequired() const override
Returns the number of file descriptors the backend expects to need.
std::string getName() override
Get the human-readable name of this backend.
void open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt) override
Open the backend.
void verify() override
Perform consistency checks on database.
void for_each(std::function< void(std::shared_ptr< NodeObject >)> f) override
Visit every object in the database This is usually called during import.
std::atomic< bool > deletePath_
bool isOpen() override
Returns true is the database is open.
int getWriteLoad() override
Estimate the number of write operations pending.
void setDeletePath() override
Remove contents on disk upon destruction.
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal) override
Create an instance of this factory's backend.
std::string getName() const override
Retrieve the name of this factory.
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal) override
Create an instance of this factory's backend.
Scheduling for asynchronous backend activity.
virtual void onBatchWrite(BatchWriteReport const &report)=0
Reports the completion of a batch write Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:26
T empty(T... args)
T is_same_v
void registerNuDBFactory(Manager &manager)
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:91
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:202
Status
Return codes from Backend operations.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool get_if_exists(Section const &section, std::string const &name, T &v)
std::error_code make_error_code(ripple::TokenCodecErrc e)
@ open
We haven't closed our ledger yet, but others might have.
@ no
Definition Steps.h:26
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Contains information about a batch write operation.
T what(T... args)