1#include <xrpl/basics/contract.h>
2#include <xrpl/basics/rocksdb.h>
3#include <xrpl/beast/clock/basic_seconds_clock.h>
4#include <xrpl/beast/core/LexicalCast.h>
5#include <xrpl/beast/rfc2616.h>
6#include <xrpl/beast/unit_test.h>
7#include <xrpl/nodestore/detail/codec.h>
9#include <boost/beast/core/string.hpp>
10#include <boost/regex.hpp>
12#include <nudb/create.hpp>
13#include <nudb/detail/format.hpp>
14#include <nudb/xxhasher.hpp>
70template <
class Rep,
class Period>
88 os << round<nanoseconds>(d).count();
104 os << round<microseconds>(d).count();
120 os << round<milliseconds>(d).count();
136 os << round<seconds>(d).count();
152 os << round<minutes>(d).count();
159template <
class Period,
class Rep>
200 auto const elapsed = now -
start_;
211 auto const rate = elapsed.count() / double(work);
214 log <<
"Remaining: " <<
detail::fmtdur(remain) <<
" (" << work <<
" of " <<
work_ <<
" in "
233 static boost::regex
const re1(
236 "([a-zA-Z][_a-zA-Z0-9]*)"
243 boost::regex_constants::optimize);
246 for (
auto const& kv : v)
249 if (!boost::regex_match(kv, m, re1))
250 Throw<std::runtime_error>(
"invalid parameter " + kv);
253 Throw<std::runtime_error>(
"duplicate parameter " + m[1]);
260#if XRPL_ROCKSDB_AVAILABLE
270 using namespace nudb;
271 using namespace nudb::detail;
274 auto const args = parse_args(arg());
275 bool usage = args.empty();
277 if (!usage && args.find(
"from") == args.end())
279 log <<
"Missing parameter: from";
282 if (!usage && args.find(
"to") == args.end())
284 log <<
"Missing parameter: to";
287 if (!usage && args.find(
"buffer") == args.end())
289 log <<
"Missing parameter: buffer";
296 <<
"--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
297 <<
"from: RocksDB database to import from\n"
298 <<
"to: NuDB database to import to\n"
299 <<
"buffer: Buffer size (bigger is faster)\n"
300 <<
"NuDB database must not already exist.";
309 auto const from_path = args.at(
"from");
310 auto const to_path = args.at(
"to");
312 using hash_type = nudb::xxhasher;
313 auto const bulk_size = 64 * 1024 * 1024;
314 float const load_factor = 0.5;
316 auto const dp = to_path +
".dat";
317 auto const kp = to_path +
".key";
321 log <<
"from: " << from_path
331 rocksdb::Options options;
332 options.create_if_missing =
false;
333 options.max_open_files = 2000;
334 rocksdb::DB* pdb =
nullptr;
335 rocksdb::Status
const status = rocksdb::DB::OpenForReadOnly(options, from_path, &pdb);
336 if (!
status.ok() || (pdb ==
nullptr))
337 Throw<std::runtime_error>(
"Can't open '" + from_path +
"': " +
status.ToString());
342 dat_file_header dh{};
343 dh.version = currentVersion;
350 df.create(file_mode::append, dp, ec);
352 Throw<nudb::system_error>(ec);
353 bulk_writer<native_file> dw(df, 0, bulk_size);
356 auto os = dw.prepare(dat_file_header::size, ec);
358 Throw<nudb::system_error>(ec);
361 rocksdb::ReadOptions options;
362 options.verify_checksums =
false;
363 options.fill_cache =
false;
367 for (it->SeekToFirst(); it->Valid(); it->Next())
369 if (it->key().size() != 32)
371 Throw<std::runtime_error>(
374 void const*
const key = it->key().data();
375 void const*
const data = it->value().data();
376 auto const size = it->value().size();
385 BEAST_EXPECT(
check.second == size);
389 auto os = dw.prepare(
390 field<uint48_t>::size +
395 Throw<nudb::system_error>(ec);
396 write<uint48_t>(os,
out.second);
403 Throw<nudb::system_error>(ec);
407 auto const df_size = df.size(ec);
409 Throw<nudb::system_error>(ec);
411 key_file_header kh{};
412 kh.version = currentVersion;
414 kh.appnum = dh.appnum;
416 kh.salt = make_salt();
417 kh.pepper = pepper<hash_type>(kh.salt);
418 kh.block_size = block_size(kp);
420 kh.buckets =
std::ceil(nitems / (bucket_capacity(kh.block_size) * load_factor));
421 kh.modulus = ceil_pow2(kh.buckets);
423 kf.create(file_mode::append, kp, ec);
425 Throw<nudb::system_error>(ec);
426 buffer buf(kh.block_size);
429 ostream os(buf.get(), kh.block_size);
431 kf.write(0, buf.get(), kh.block_size, ec);
433 Throw<nudb::system_error>(ec);
439 buf.reserve(buckets * kh.block_size);
440 auto const passes = (kh.buckets + buckets - 1) / buckets;
441 log <<
"items: " << nitems
451 progress p(df_size * passes);
453 for (
std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
455 auto const b1 =
std::min(b0 + buckets, kh.buckets);
457 auto const bn = b1 - b0;
461 bucket
const b(kh.block_size, buf.get() + (i * kh.block_size), empty);
465 bulk_reader<native_file> r(df, dat_file_header::size, df_size, bulk_size);
468 auto const offset = r.offset();
471 auto is = r.prepare(field<uint48_t>::size, ec);
473 Throw<nudb::system_error>(ec);
474 read<uint48_t>(is, size);
483 Throw<nudb::system_error>(ec);
486 auto const n = bucket_index(h, kh.buckets, kh.modulus);
487 p(log, (npass * df_size) + r.offset());
488 if (n < b0 || n >= b1)
490 bucket b(kh.block_size, buf.get() + ((n - b0) * kh.block_size));
491 maybe_spill(b, dw, ec);
493 Throw<nudb::system_error>(ec);
494 b.insert(offset, size, h);
500 is = r.prepare(field<std::uint16_t>::size, ec);
502 Throw<nudb::system_error>(ec);
503 read<std::uint16_t>(is, size);
506 Throw<nudb::system_error>(ec);
509 kf.write((b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
511 Throw<nudb::system_error>(ec);
516 Throw<nudb::system_error>(ec);
521BEAST_DEFINE_TESTSUITE_MANUAL(
import, nodestore,
xrpl);
A clock whose minimum resolution is one second.
typename Clock::duration duration
typename Clock::time_point time_point
clock_type::time_point report_
clock_type::time_point start_
progress(std::size_t work)
void operator()(Log &log, std::size_t work)
clock_type::time_point now_
save_stream_state(std::ostream &os)
std::streamsize precision_
std::ios::char_type fill_
save_stream_state(save_stream_state const &)=delete
save_stream_state & operator=(save_stream_state const &)=delete
std::ios::fmtflags flags_
void check(bool condition, std::string const &message)
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
void filter_inner(void *in, std::size_t in_size)
void write(nudb::detail::ostream &os, std::size_t t)
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
std::map< std::string, std::string, boost::beast::iless > parse_args(std::string const &s)
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
std::ostream & pretty_time(std::ostream &os, std::chrono::duration< Rep, Period > d)
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
int run(int argc, char **argv)
T setprecision(T... args)