rippled
Loading...
Searching...
No Matches
NuDBFactory.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/beast/core/LexicalCast.h>
3#include <xrpl/beast/utility/instrumentation.h>
4#include <xrpl/nodestore/Factory.h>
5#include <xrpl/nodestore/Manager.h>
6#include <xrpl/nodestore/detail/DecodedBlob.h>
7#include <xrpl/nodestore/detail/EncodedBlob.h>
8#include <xrpl/nodestore/detail/codec.h>
9
10#include <boost/filesystem.hpp>
11
12#include <nudb/nudb.hpp>
13
14#include <chrono>
15#include <cstdint>
16#include <cstdio>
17#include <exception>
18#include <memory>
19
20namespace xrpl {
21namespace NodeStore {
22
23class NuDBBackend : public Backend
24{
25public:
26 // "appnum" is an application-defined constant stored in the header of a
27 // NuDB database. We used it to identify shard databases before that code
28 // was removed. For now, its only use is a sanity check that the database
29 // was created by xrpld.
30 static constexpr std::uint64_t appnum = 1;
31
33 size_t const keyBytes_;
37 nudb::store db_;
40
42 size_t keyBytes,
43 Section const& keyValues,
45 Scheduler& scheduler,
46 beast::Journal journal)
47 : j_(journal)
48 , keyBytes_(keyBytes)
50 , name_(get(keyValues, "path"))
51 , blockSize_(parseBlockSize(name_, keyValues, journal))
52 , deletePath_(false)
53 , scheduler_(scheduler)
54 {
55 if (name_.empty())
56 Throw<std::runtime_error>(
57 "nodestore: Missing path in NuDB backend");
58 }
59
61 size_t keyBytes,
62 Section const& keyValues,
64 Scheduler& scheduler,
65 nudb::context& context,
66 beast::Journal journal)
67 : j_(journal)
68 , keyBytes_(keyBytes)
70 , name_(get(keyValues, "path"))
71 , blockSize_(parseBlockSize(name_, keyValues, journal))
72 , db_(context)
73 , deletePath_(false)
74 , scheduler_(scheduler)
75 {
76 if (name_.empty())
77 Throw<std::runtime_error>(
78 "nodestore: Missing path in NuDB backend");
79 }
80
81 ~NuDBBackend() override
82 {
83 try
84 {
85 // close can throw and we don't want the destructor to throw.
86 close();
87 }
88 catch (nudb::system_error const&)
89 {
90 // Don't allow exceptions to propagate out of destructors.
91 // close() has already logged the error.
92 }
93 }
94
96 getName() override
97 {
98 return name_;
99 }
100
102 getBlockSize() const override
103 {
104 return blockSize_;
105 }
106
107 void
108 open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt)
109 override
110 {
111 using namespace boost::filesystem;
112 if (db_.is_open())
113 {
114 // LCOV_EXCL_START
115 UNREACHABLE(
116 "xrpl::NodeStore::NuDBBackend::open : database is already "
117 "open");
118 JLOG(j_.error()) << "database is already open";
119 return;
120 // LCOV_EXCL_STOP
121 }
122 auto const folder = path(name_);
123 auto const dp = (folder / "nudb.dat").string();
124 auto const kp = (folder / "nudb.key").string();
125 auto const lp = (folder / "nudb.log").string();
126 nudb::error_code ec;
127 if (createIfMissing)
128 {
129 create_directories(folder);
130 nudb::create<nudb::xxhasher>(
131 dp,
132 kp,
133 lp,
134 appType,
135 uid,
136 salt,
137 keyBytes_,
139 0.50,
140 ec);
141 if (ec == nudb::errc::file_exists)
142 ec = {};
143 if (ec)
144 Throw<nudb::system_error>(ec);
145 }
146 db_.open(dp, kp, lp, ec);
147 if (ec)
148 Throw<nudb::system_error>(ec);
149
150 if (db_.appnum() != appnum)
151 Throw<std::runtime_error>("nodestore: unknown appnum");
152 db_.set_burst(burstSize_);
153 }
154
155 bool
156 isOpen() override
157 {
158 return db_.is_open();
159 }
160
161 void
162 open(bool createIfMissing) override
163 {
164 open(createIfMissing, appnum, nudb::make_uid(), nudb::make_salt());
165 }
166
167 void
168 close() override
169 {
170 if (db_.is_open())
171 {
172 nudb::error_code ec;
173 db_.close(ec);
174 if (ec)
175 {
176 // Log to make sure the nature of the error gets to the user.
177 JLOG(j_.fatal()) << "NuBD close() failed: " << ec.message();
178 Throw<nudb::system_error>(ec);
179 }
180
181 if (deletePath_)
182 {
183 boost::filesystem::remove_all(name_, ec);
184 if (ec)
185 {
186 JLOG(j_.fatal()) << "Filesystem remove_all of " << name_
187 << " failed with: " << ec.message();
188 }
189 }
190 }
191 }
192
193 Status
194 fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
195 {
196 Status status;
197 pno->reset();
198 nudb::error_code ec;
199 db_.fetch(
200 key,
201 [key, pno, &status](void const* data, std::size_t size) {
202 nudb::detail::buffer bf;
203 auto const result = nodeobject_decompress(data, size, bf);
204 DecodedBlob decoded(key, result.first, result.second);
205 if (!decoded.wasOk())
206 {
207 status = dataCorrupt;
208 return;
209 }
210 *pno = decoded.createObject();
211 status = ok;
212 },
213 ec);
214 if (ec == nudb::error::key_not_found)
215 return notFound;
216 if (ec)
217 Throw<nudb::system_error>(ec);
218 return status;
219 }
220
223 {
225 results.reserve(hashes.size());
226 for (auto const& h : hashes)
227 {
229 Status status = fetch(h->begin(), &nObj);
230 if (status != ok)
231 results.push_back({});
232 else
233 results.push_back(nObj);
234 }
235
236 return {results, ok};
237 }
238
239 void
241 {
242 EncodedBlob e(no);
243 nudb::error_code ec;
244 nudb::detail::buffer bf;
245 auto const result = nodeobject_compress(e.getData(), e.getSize(), bf);
246 db_.insert(e.getKey(), result.first, result.second, ec);
247 if (ec && ec != nudb::error::key_exists)
248 Throw<nudb::system_error>(ec);
249 }
250
251 void
253 {
254 BatchWriteReport report;
255 report.writeCount = 1;
256 auto const start = std::chrono::steady_clock::now();
257 do_insert(no);
258 report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
260 scheduler_.onBatchWrite(report);
261 }
262
263 void
264 storeBatch(Batch const& batch) override
265 {
266 BatchWriteReport report;
267 report.writeCount = batch.size();
268 auto const start = std::chrono::steady_clock::now();
269 for (auto const& e : batch)
270 do_insert(e);
271 report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
273 scheduler_.onBatchWrite(report);
274 }
275
276 void
277 sync() override
278 {
279 }
280
281 void
283 {
284 auto const dp = db_.dat_path();
285 auto const kp = db_.key_path();
286 auto const lp = db_.log_path();
287 // auto const appnum = db_.appnum();
288 nudb::error_code ec;
289 db_.close(ec);
290 if (ec)
291 Throw<nudb::system_error>(ec);
292 nudb::visit(
293 dp,
294 [&](void const* key,
295 std::size_t key_bytes,
296 void const* data,
297 std::size_t size,
298 nudb::error_code&) {
299 nudb::detail::buffer bf;
300 auto const result = nodeobject_decompress(data, size, bf);
301 DecodedBlob decoded(key, result.first, result.second);
302 if (!decoded.wasOk())
303 {
304 ec = make_error_code(nudb::error::missing_value);
305 return;
306 }
307 f(decoded.createObject());
308 },
309 nudb::no_progress{},
310 ec);
311 if (ec)
312 Throw<nudb::system_error>(ec);
313 db_.open(dp, kp, lp, ec);
314 if (ec)
315 Throw<nudb::system_error>(ec);
316 }
317
318 int
319 getWriteLoad() override
320 {
321 return 0;
322 }
323
324 void
325 setDeletePath() override
326 {
327 deletePath_ = true;
328 }
329
330 void
331 verify() override
332 {
333 auto const dp = db_.dat_path();
334 auto const kp = db_.key_path();
335 auto const lp = db_.log_path();
336 nudb::error_code ec;
337 db_.close(ec);
338 if (ec)
339 Throw<nudb::system_error>(ec);
340 nudb::verify_info vi;
341 nudb::verify<nudb::xxhasher>(vi, dp, kp, 0, nudb::no_progress{}, ec);
342 if (ec)
343 Throw<nudb::system_error>(ec);
344 db_.open(dp, kp, lp, ec);
345 if (ec)
346 Throw<nudb::system_error>(ec);
347 }
348
349 int
350 fdRequired() const override
351 {
352 return 3;
353 }
354
355private:
356 static std::size_t
358 std::string const& name,
359 Section const& keyValues,
360 beast::Journal journal)
361 {
362 using namespace boost::filesystem;
363 auto const folder = path(name);
364 auto const kp = (folder / "nudb.key").string();
365
366 std::size_t const defaultSize =
367 nudb::block_size(kp); // Default 4K from NuDB
368 std::size_t blockSize = defaultSize;
369 std::string blockSizeStr;
370
371 if (!get_if_exists(keyValues, "nudb_block_size", blockSizeStr))
372 {
373 return blockSize; // Early return with default
374 }
375
376 try
377 {
378 std::size_t const parsedBlockSize =
379 beast::lexicalCastThrow<std::size_t>(blockSizeStr);
380
381 // Validate: must be power of 2 between 4K and 32K
382 if (parsedBlockSize < 4096 || parsedBlockSize > 32768 ||
383 (parsedBlockSize & (parsedBlockSize - 1)) != 0)
384 {
386 s << "Invalid nudb_block_size: " << parsedBlockSize
387 << ". Must be power of 2 between 4096 and 32768.";
388 Throw<std::runtime_error>(s.str());
389 }
390
391 JLOG(journal.info())
392 << "Using custom NuDB block size: " << parsedBlockSize
393 << " bytes";
394 return parsedBlockSize;
395 }
396 catch (std::exception const& e)
397 {
399 s << "Invalid nudb_block_size value: " << blockSizeStr
400 << ". Error: " << e.what();
401 Throw<std::runtime_error>(s.str());
402 }
403 }
404};
405
406//------------------------------------------------------------------------------
407
408class NuDBFactory : public Factory
409{
410private:
412
413public:
414 explicit NuDBFactory(Manager& manager) : manager_(manager)
415 {
416 manager_.insert(*this);
417 }
418
420 getName() const override
421 {
422 return "NuDB";
423 }
424
427 size_t keyBytes,
428 Section const& keyValues,
430 Scheduler& scheduler,
431 beast::Journal journal) override
432 {
434 keyBytes, keyValues, burstSize, scheduler, journal);
435 }
436
439 size_t keyBytes,
440 Section const& keyValues,
442 Scheduler& scheduler,
443 nudb::context& context,
444 beast::Journal journal) override
445 {
447 keyBytes, keyValues, burstSize, scheduler, context, journal);
448 }
449};
450
451void
453{
454 static NuDBFactory instance{manager};
455}
456
457} // namespace NodeStore
458} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream error() const
Definition Journal.h:327
Stream info() const
Definition Journal.h:315
A backend used for the NodeStore.
Definition Backend.h:21
Parsed key/value blob into NodeObject components.
Definition DecodedBlob.h:20
std::shared_ptr< NodeObject > createObject()
Create a NodeObject from this data.
bool wasOk() const noexcept
Determine if the decoding was successful.
Definition DecodedBlob.h:27
Convert a NodeObject from in-memory to database format.
Definition EncodedBlob.h:37
void const * getKey() const noexcept
Definition EncodedBlob.h:98
std::size_t getSize() const noexcept
void const * getData() const noexcept
Base class for backend factories.
Definition Factory.h:17
Singleton for managing NodeStore factories and back ends.
Definition Manager.h:13
virtual void insert(Factory &factory)=0
Add a factory.
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)
std::size_t const blockSize_
void verify() override
Perform consistency checks on database.
void storeBatch(Batch const &batch) override
Store a group of objects.
int getWriteLoad() override
Estimate the number of write operations pending.
std::pair< std::vector< std::shared_ptr< NodeObject > >, Status > fetchBatch(std::vector< uint256 const * > const &hashes) override
Fetch a batch synchronously.
std::optional< std::size_t > getBlockSize() const override
Get the block size for backends that support it.
static std::size_t parseBlockSize(std::string const &name, Section const &keyValues, beast::Journal journal)
int fdRequired() const override
Returns the number of file descriptors the backend expects to need.
void store(std::shared_ptr< NodeObject > const &no) override
Store a single object.
beast::Journal const j_
static constexpr std::uint64_t appnum
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal)
std::string getName() override
Get the human-readable name of this backend.
Status fetch(void const *key, std::shared_ptr< NodeObject > *pno) override
Fetch a single object.
void do_insert(std::shared_ptr< NodeObject > const &no)
void open(bool createIfMissing) override
Open the backend.
void open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt) override
Open the backend.
std::size_t const burstSize_
void close() override
Close the backend.
void setDeletePath() override
Remove contents on disk upon destruction.
bool isOpen() override
Returns true is the database is open.
void for_each(std::function< void(std::shared_ptr< NodeObject >)> f) override
Visit every object in the database This is usually called during import.
std::atomic< bool > deletePath_
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal) override
Create an instance of this factory's backend.
NuDBFactory(Manager &manager)
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal) override
Create an instance of this factory's backend.
std::string getName() const override
Retrieve the name of this factory.
Scheduling for asynchronous backend activity.
virtual void onBatchWrite(BatchWriteReport const &report)=0
Reports the completion of a batch write Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:26
T empty(T... args)
T is_same_v
void registerNuDBFactory(Manager &manager)
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:202
Status
Return codes from Backend operations.
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:91
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::error_code make_error_code(xrpl::TokenCodecErrc e)
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
@ open
We haven't closed our ledger yet, but others might have.
bool get_if_exists(Section const &section, std::string const &name, T &v)
@ no
Definition Steps.h:26
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Contains information about a batch write operation.
T what(T... args)