1#include <xrpl/basics/contract.h>
2#include <xrpl/beast/clock/basic_seconds_clock.h>
3#include <xrpl/beast/rfc2616.h>
4#include <xrpl/beast/unit_test/suite.h>
5#include <xrpl/nodestore/detail/codec.h>
7#include <boost/beast/core/string.hpp>
8#include <boost/regex.hpp>
9#include <boost/regex/v5/regbase.hpp>
10#include <boost/regex/v5/regex.hpp>
11#include <boost/regex/v5/regex_match.hpp>
13#include <nudb/create.hpp>
14#include <nudb/detail/bucket.hpp>
15#include <nudb/detail/buffer.hpp>
16#include <nudb/detail/bulkio.hpp>
17#include <nudb/detail/field.hpp>
18#include <nudb/detail/format.hpp>
19#include <nudb/detail/stream.hpp>
20#include <nudb/error.hpp>
21#include <nudb/file.hpp>
22#include <nudb/native_file.hpp>
23#include <nudb/xxhasher.hpp>
25#if XRPL_ROCKSDB_AVAILABLE
27#include <rocksdb/db.h>
28#include <rocksdb/iterator.h>
29#include <rocksdb/options.h>
30#include <rocksdb/status.h>
98template <
class Rep,
class Period>
116 os << round<nanoseconds>(d).count();
132 os << round<microseconds>(d).count();
148 os << round<milliseconds>(d).count();
164 os << round<seconds>(d).count();
180 os << round<minutes>(d).count();
187template <
class Period,
class Rep>
228 auto const elapsed = now -
start_;
239 auto const rate = elapsed.count() / double(work);
261 static boost::regex
const kRe1(
264 "([a-zA-Z][_a-zA-Z0-9]*)"
271 boost::regex_constants::optimize);
274 for (
auto const& kv : v)
277 if (!boost::regex_match(kv, m, kRe1))
288#if XRPL_ROCKSDB_AVAILABLE
298 using namespace nudb;
299 using namespace nudb::detail;
302 auto const args = parseArgs(arg());
303 bool usage = args.empty();
305 if (!usage && !args.contains(
"from"))
307 log <<
"Missing parameter: from";
310 if (!usage && !args.contains(
"to"))
312 log <<
"Missing parameter: to";
315 if (!usage && !args.contains(
"buffer"))
317 log <<
"Missing parameter: buffer";
324 <<
"--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
325 <<
"from: RocksDB database to import from\n"
326 <<
"to: NuDB database to import to\n"
327 <<
"buffer: Buffer size (bigger is faster)\n"
328 <<
"NuDB database must not already exist.";
336 std::size_t
const bufferSize =
std::stoull(args.at(
"buffer"));
337 auto const fromPath = args.at(
"from");
338 auto const toPath = args.at(
"to");
340 using hash_type = nudb::xxhasher;
341 auto const bulkSize = 64 * 1024 * 1024;
342 float const loadFactor = 0.5;
344 auto const dp = toPath +
".dat";
345 auto const kp = toPath +
".key";
349 log <<
"from: " << fromPath
357 std::unique_ptr<rocksdb::DB> db;
359 rocksdb::Options options;
360 options.create_if_missing =
false;
361 options.max_open_files = 2000;
362 rocksdb::DB* pdb =
nullptr;
363 rocksdb::Status
const status = rocksdb::DB::OpenForReadOnly(options, fromPath, &pdb);
364 if (!
status.ok() || (pdb ==
nullptr))
365 Throw<std::runtime_error>(
"Can't open '" + fromPath +
"': " +
status.ToString());
369 std::size_t nitems = 0;
370 dat_file_header dh{};
371 dh.version = currentVersion;
378 df.create(file_mode::append, dp, ec);
380 Throw<nudb::system_error>(ec);
381 bulk_writer<native_file> dw(df, 0, bulkSize);
384 auto os = dw.prepare(dat_file_header::size, ec);
386 Throw<nudb::system_error>(ec);
389 rocksdb::ReadOptions options;
390 options.verify_checksums =
false;
391 options.fill_cache =
false;
392 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
395 for (it->SeekToFirst(); it->Valid(); it->Next())
397 if (it->key().size() != 32)
399 Throw<std::runtime_error>(
402 void const*
const key = it->key().data();
403 void const*
const data = it->value().data();
404 auto const size = it->value().size();
405 std::unique_ptr<char[]>
const clean(
new char[size]);
413 BEAST_EXPECT(
check.second == size);
417 auto os = dw.prepare(
418 field<uint48_t>::size +
423 Throw<nudb::system_error>(ec);
424 write<uint48_t>(os, out.second);
426 std::memcpy(os.data(out.second), out.first, out.second);
431 Throw<nudb::system_error>(ec);
435 auto const dfSize = df.size(ec);
437 Throw<nudb::system_error>(ec);
439 key_file_header kh{};
440 kh.version = currentVersion;
442 kh.appnum = dh.appnum;
444 kh.salt = make_salt();
445 kh.pepper = pepper<hash_type>(kh.salt);
446 kh.block_size = block_size(kp);
448 kh.buckets =
std::ceil(nitems / (bucket_capacity(kh.block_size) * loadFactor));
449 kh.modulus = ceil_pow2(kh.buckets);
451 kf.create(file_mode::append, kp, ec);
453 Throw<nudb::system_error>(ec);
454 buffer buf(kh.block_size);
457 ostream os(buf.get(), kh.block_size);
459 kf.write(0, buf.get(), kh.block_size, ec);
461 Throw<nudb::system_error>(ec);
467 buf.reserve(buckets * kh.block_size);
468 auto const passes = (kh.buckets + buckets - 1) / buckets;
469 log <<
"items: " << nitems
479 Progress p(dfSize * passes);
480 std::size_t npass = 0;
481 for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
483 auto const b1 =
std::min(b0 + buckets, kh.buckets);
485 auto const bn = b1 - b0;
487 for (std::size_t i = 0; i < bn; ++i)
489 bucket
const b(kh.block_size, buf.get() + (i * kh.block_size), empty);
493 bulk_reader<native_file> r(df, dat_file_header::size, dfSize, bulkSize);
496 auto const offset = r.offset();
498 std::size_t
size = 0;
499 auto is = r.prepare(field<uint48_t>::size, ec);
501 Throw<nudb::system_error>(ec);
502 read<uint48_t>(is, size);
511 Throw<nudb::system_error>(ec);
512 std::uint8_t
const*
const key = is.data(dh.key_size);
513 auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
514 auto const n = bucket_index(h, kh.buckets, kh.modulus);
515 p(log, (npass * dfSize) + r.offset());
516 if (n < b0 || n >= b1)
518 bucket b(kh.block_size, buf.get() + ((n - b0) * kh.block_size));
519 maybe_spill(b, dw, ec);
521 Throw<nudb::system_error>(ec);
522 b.insert(offset, size, h);
528 is = r.prepare(field<std::uint16_t>::size, ec);
530 Throw<nudb::system_error>(ec);
531 read<std::uint16_t>(is, size);
534 Throw<nudb::system_error>(ec);
537 kf.write((b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
539 Throw<nudb::system_error>(ec);
544 Throw<nudb::system_error>(ec);
549BEAST_DEFINE_TESTSUITE_MANUAL(
import, nodestore, xrpl);
A clock whose minimum resolution is one second.
Clock::time_point time_point
beast::BasicSecondsClock clock_type
clock_type::time_point start_
Progress(std::size_t work)
clock_type::time_point now_
clock_type::time_point report_
void operator()(Log &log, std::size_t work)
std::ios::char_type fill_
std::ios::fmtflags flags_
SaveStreamState & operator=(SaveStreamState const &)=delete
std::streamsize precision_
SaveStreamState(SaveStreamState const &)=delete
SaveStreamState(std::ostream &os)
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
void check(bool condition, std::string const &message)
void filterInner(void *in, std::size_t inSize)
std::pair< void const *, std::size_t > nodeobjectCompress(void const *in, std::size_t inSize, BufferFactory &&bf)
std::map< std::string, std::string, boost::beast::iless > parseArgs(std::string const &s)
void write(nudb::detail::ostream &os, std::size_t t)
std::pair< void const *, std::size_t > nodeobjectDecompress(void const *in, std::size_t inSize, BufferFactory &&bf)
std::ostream & prettyTime(std::ostream &os, std::chrono::duration< Rep, Period > d)
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
int run(int argc, char **argv)
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
T setprecision(T... args)