xrpld
Loading...
Searching...
No Matches
import_test.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/beast/clock/basic_seconds_clock.h>
3#include <xrpl/beast/rfc2616.h>
4#include <xrpl/beast/unit_test/suite.h>
5#include <xrpl/nodestore/detail/codec.h>
6
7#include <boost/beast/core/string.hpp>
8#include <boost/regex.hpp> // IWYU pragma: keep
9#include <boost/regex/v5/regbase.hpp>
10#include <boost/regex/v5/regex.hpp>
11#include <boost/regex/v5/regex_match.hpp>
12
13#include <nudb/create.hpp> // IWYU pragma: keep
14#include <nudb/detail/bucket.hpp>
15#include <nudb/detail/buffer.hpp>
16#include <nudb/detail/bulkio.hpp>
17#include <nudb/detail/field.hpp>
18#include <nudb/detail/format.hpp>
19#include <nudb/detail/stream.hpp>
20#include <nudb/error.hpp>
21#include <nudb/file.hpp>
22#include <nudb/native_file.hpp>
23#include <nudb/xxhasher.hpp>
24
25#if XRPL_ROCKSDB_AVAILABLE
26
27#include <rocksdb/db.h>
28#include <rocksdb/iterator.h>
29#include <rocksdb/options.h>
30#include <rocksdb/status.h>
31
32#endif
33
34#include <algorithm>
35#include <chrono>
36#include <cmath>
37#include <cstddef>
38#include <cstdint>
39#include <cstring>
40#include <iomanip>
41#include <ios>
42#include <map>
43#include <memory>
44#include <ostream>
45#include <ratio>
46#include <sstream>
47#include <stdexcept>
48#include <string>
49
50/*
51
52Math:
53
541000 gb dat file
55170 gb key file
56capacity 113 keys/bucket
57
58normal:
591,000gb data file read
6019,210gb key file read (113 * 170)
6119,210gb key file write
62
63multi(32gb):
646 passes (170/32)
656,000gb data file read
66170gb key file write
67
68
69*/
70
71namespace xrpl {
72
73namespace detail {
74
76{
79 std::ios::fmtflags flags_;
80 std::ios::char_type fill_;
81
82public:
84 {
85 os_.precision(precision_);
86 os_.flags(flags_);
87 os_.fill(fill_);
88 }
91 operator=(SaveStreamState const&) = delete;
93 : os_(os), precision_(os.precision()), flags_(os.flags()), fill_(os.fill())
94 {
95 }
96};
97
98template <class Rep, class Period>
101{
102 SaveStreamState const _(os);
103 using namespace std::chrono;
104 if (d < microseconds{1})
105 {
106 // use nanoseconds
107 if (d < nanoseconds{100})
108 {
109 // use floating
111 os << std::fixed << std::setprecision(1) << ns(d).count();
112 }
113 else
114 {
115 // use integral
116 os << round<nanoseconds>(d).count();
117 }
118 os << "ns";
119 }
120 else if (d < milliseconds{1})
121 {
122 // use microseconds
123 if (d < microseconds{100})
124 {
125 // use floating
127 os << std::fixed << std::setprecision(1) << ms(d).count();
128 }
129 else
130 {
131 // use integral
132 os << round<microseconds>(d).count();
133 }
134 os << "us";
135 }
136 else if (d < seconds{1})
137 {
138 // use milliseconds
139 if (d < milliseconds{100})
140 {
141 // use floating
143 os << std::fixed << std::setprecision(1) << ms(d).count();
144 }
145 else
146 {
147 // use integral
148 os << round<milliseconds>(d).count();
149 }
150 os << "ms";
151 }
152 else if (d < minutes{1})
153 {
154 // use seconds
155 if (d < seconds{100})
156 {
157 // use floating
158 using s = duration<float>;
159 os << std::fixed << std::setprecision(1) << s(d).count();
160 }
161 else
162 {
163 // use integral
164 os << round<seconds>(d).count();
165 }
166 os << "s";
167 }
168 else
169 {
170 // use minutes
171 if (d < minutes{100})
172 {
173 // use floating
175 os << std::fixed << std::setprecision(1) << m(d).count();
176 }
177 else
178 {
179 // use integral
180 os << round<minutes>(d).count();
181 }
182 os << "min";
183 }
184 return os;
185}
186
187template <class Period, class Rep>
188inline std::string
190{
192 prettyTime(ss, d);
193 return ss.str();
194}
195
196} // namespace detail
197
198namespace NodeStore {
199
200//------------------------------------------------------------------------------
201
203{
204private:
206
212 bool estimate_ = false;
213
214public:
215 explicit Progress(std::size_t work) : work_(work)
216 {
217 }
218
219 template <class Log>
220 void
222 {
223 using namespace std::chrono;
224 auto const now = clock_type::now();
225 if (now == now_)
226 return;
227 now_ = now;
228 auto const elapsed = now - start_;
229 if (!estimate_)
230 {
231 if (elapsed < seconds(15))
232 return;
233 estimate_ = true;
234 }
235 else if (now - report_ < std::chrono::seconds(60))
236 {
237 return;
238 }
239 auto const rate = elapsed.count() / double(work);
240 clock_type::duration const remain(
241 static_cast<clock_type::duration::rep>((work_ - work) * rate));
242 log << "Remaining: " << detail::fmtdur(remain) << " (" << work << " of " << work_ << " in "
243 << detail::fmtdur(elapsed) << ", " << (work - prev_) << " in "
244 << detail::fmtdur(now - report_) << ")";
245 report_ = now;
246 prev_ = work;
247 }
248
249 template <class Log>
250 void
252 {
253 log << "Total time: " << detail::fmtdur(clock_type::now() - start_);
254 }
255};
256
259{
260 // <key> '=' <value>
261 static boost::regex const kRe1(
262 "^" // start of line
263 "(?:\\s*)" // whitespace (optional)
264 "([a-zA-Z][_a-zA-Z0-9]*)" // <key>
265 "(?:\\s*)" // whitespace (optional)
266 "(?:=)" // '='
267 "(?:\\s*)" // whitespace (optional)
268 "(.*\\S+)" // <value>
269 "(?:\\s*)" // whitespace (optional)
270 ,
271 boost::regex_constants::optimize);
273 auto const v = beast::rfc2616::split(s.begin(), s.end(), ',');
274 for (auto const& kv : v)
275 {
276 boost::smatch m;
277 if (!boost::regex_match(kv, m, kRe1))
278 Throw<std::runtime_error>("invalid parameter " + kv);
279 auto const result = map.emplace(m[1], m[2]);
280 if (!result.second)
281 Throw<std::runtime_error>("duplicate parameter " + m[1]);
282 }
283 return map;
284}
285
286//------------------------------------------------------------------------------
287
288#if XRPL_ROCKSDB_AVAILABLE
289
290class import_test : public beast::unit_test::Suite
291{
292public:
293 void
294 run() override
295 {
296 testcase(beast::unit_test::AbortT::AbortOnFail) << arg();
297
298 using namespace nudb;
299 using namespace nudb::detail;
300
301 pass();
302 auto const args = parseArgs(arg());
303 bool usage = args.empty();
304
305 if (!usage && !args.contains("from"))
306 {
307 log << "Missing parameter: from";
308 usage = true;
309 }
310 if (!usage && !args.contains("to"))
311 {
312 log << "Missing parameter: to";
313 usage = true;
314 }
315 if (!usage && !args.contains("buffer"))
316 {
317 log << "Missing parameter: buffer";
318 usage = true;
319 }
320
321 if (usage)
322 {
323 log << "Usage:\n"
324 << "--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
325 << "from: RocksDB database to import from\n"
326 << "to: NuDB database to import to\n"
327 << "buffer: Buffer size (bigger is faster)\n"
328 << "NuDB database must not already exist.";
329 return;
330 }
331
332 // This controls the size of the bucket buffer.
333 // For a 1TB data file, a 32GB bucket buffer is suggested.
334 // The larger the buffer, the faster the import.
335 //
336 std::size_t const bufferSize = std::stoull(args.at("buffer"));
337 auto const fromPath = args.at("from");
338 auto const toPath = args.at("to");
339
340 using hash_type = nudb::xxhasher;
341 auto const bulkSize = 64 * 1024 * 1024;
342 float const loadFactor = 0.5;
343
344 auto const dp = toPath + ".dat";
345 auto const kp = toPath + ".key";
346
347 auto const start = std::chrono::steady_clock::now();
348
349 log << "from: " << fromPath
350 << "\n"
351 "to: "
352 << toPath
353 << "\n"
354 "buffer: "
355 << bufferSize;
356
357 std::unique_ptr<rocksdb::DB> db;
358 {
359 rocksdb::Options options;
360 options.create_if_missing = false;
361 options.max_open_files = 2000; // 5000?
362 rocksdb::DB* pdb = nullptr;
363 rocksdb::Status const status = rocksdb::DB::OpenForReadOnly(options, fromPath, &pdb);
364 if (!status.ok() || (pdb == nullptr))
365 Throw<std::runtime_error>("Can't open '" + fromPath + "': " + status.ToString());
366 db.reset(pdb);
367 }
368 // Create data file with values
369 std::size_t nitems = 0;
370 dat_file_header dh{};
371 dh.version = currentVersion;
372 dh.uid = make_uid();
373 dh.appnum = 1;
374 dh.key_size = 32;
375
376 native_file df;
377 error_code ec;
378 df.create(file_mode::append, dp, ec);
379 if (ec)
380 Throw<nudb::system_error>(ec);
381 bulk_writer<native_file> dw(df, 0, bulkSize);
382 {
383 {
384 auto os = dw.prepare(dat_file_header::size, ec);
385 if (ec)
386 Throw<nudb::system_error>(ec);
387 write(os, dh);
388 }
389 rocksdb::ReadOptions options;
390 options.verify_checksums = false;
391 options.fill_cache = false;
392 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
393
394 buffer buf;
395 for (it->SeekToFirst(); it->Valid(); it->Next())
396 {
397 if (it->key().size() != 32)
398 {
399 Throw<std::runtime_error>(
400 "Unexpected key size " + std::to_string(it->key().size()));
401 }
402 void const* const key = it->key().data();
403 void const* const data = it->value().data();
404 auto const size = it->value().size();
405 std::unique_ptr<char[]> const clean(new char[size]);
406 std::memcpy(clean.get(), data, size);
407 filterInner(clean.get(), size);
408 auto const out = nodeobjectCompress(clean.get(), size, buf);
409 // Verify codec correctness
410 {
411 buffer buf2;
412 auto const check = nodeobjectDecompress(out.first, out.second, buf2);
413 BEAST_EXPECT(check.second == size);
414 BEAST_EXPECT(std::memcmp(check.first, clean.get(), size) == 0);
415 }
416 // Data Record
417 auto os = dw.prepare(
418 field<uint48_t>::size + // Size
419 32 + // Key
420 out.second,
421 ec);
422 if (ec)
423 Throw<nudb::system_error>(ec);
424 write<uint48_t>(os, out.second);
425 std::memcpy(os.data(32), key, 32);
426 std::memcpy(os.data(out.second), out.first, out.second);
427 ++nitems;
428 }
429 dw.flush(ec);
430 if (ec)
431 Throw<nudb::system_error>(ec);
432 }
433 db.reset();
434 log << "Import data: " << detail::fmtdur(std::chrono::steady_clock::now() - start);
435 auto const dfSize = df.size(ec);
436 if (ec)
437 Throw<nudb::system_error>(ec);
438 // Create key file
439 key_file_header kh{};
440 kh.version = currentVersion;
441 kh.uid = dh.uid;
442 kh.appnum = dh.appnum;
443 kh.key_size = 32;
444 kh.salt = make_salt();
445 kh.pepper = pepper<hash_type>(kh.salt);
446 kh.block_size = block_size(kp);
447 kh.load_factor = std::min<std::size_t>(65536.0 * loadFactor, 65535);
448 kh.buckets = std::ceil(nitems / (bucket_capacity(kh.block_size) * loadFactor));
449 kh.modulus = ceil_pow2(kh.buckets);
450 native_file kf;
451 kf.create(file_mode::append, kp, ec);
452 if (ec)
453 Throw<nudb::system_error>(ec);
454 buffer buf(kh.block_size);
455 {
456 std::memset(buf.get(), 0, kh.block_size);
457 ostream os(buf.get(), kh.block_size);
458 write(os, kh);
459 kf.write(0, buf.get(), kh.block_size, ec);
460 if (ec)
461 Throw<nudb::system_error>(ec);
462 }
463 // Build contiguous sequential sections of the
464 // key file using multiple passes over the data.
465 //
466 auto const buckets = std::max<std::size_t>(1, bufferSize / kh.block_size);
467 buf.reserve(buckets * kh.block_size);
468 auto const passes = (kh.buckets + buckets - 1) / buckets;
469 log << "items: " << nitems
470 << "\n"
471 "buckets: "
472 << kh.buckets
473 << "\n"
474 "data: "
475 << dfSize
476 << "\n"
477 "passes: "
478 << passes;
479 Progress p(dfSize * passes);
480 std::size_t npass = 0;
481 for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
482 {
483 auto const b1 = std::min(b0 + buckets, kh.buckets);
484 // Buffered range is [b0, b1)
485 auto const bn = b1 - b0;
486 // Create empty buckets
487 for (std::size_t i = 0; i < bn; ++i)
488 {
489 bucket const b(kh.block_size, buf.get() + (i * kh.block_size), empty);
490 }
491 // Insert all keys into buckets
492 // Iterate Data File
493 bulk_reader<native_file> r(df, dat_file_header::size, dfSize, bulkSize);
494 while (!r.eof())
495 {
496 auto const offset = r.offset();
497 // Data Record or Spill Record
498 std::size_t size = 0;
499 auto is = r.prepare(field<uint48_t>::size, ec); // Size
500 if (ec)
501 Throw<nudb::system_error>(ec);
502 read<uint48_t>(is, size);
503 if (size > 0)
504 {
505 // Data Record
506 is = r.prepare(
507 dh.key_size + // Key
508 size,
509 ec); // Data
510 if (ec)
511 Throw<nudb::system_error>(ec);
512 std::uint8_t const* const key = is.data(dh.key_size);
513 auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
514 auto const n = bucket_index(h, kh.buckets, kh.modulus);
515 p(log, (npass * dfSize) + r.offset());
516 if (n < b0 || n >= b1)
517 continue;
518 bucket b(kh.block_size, buf.get() + ((n - b0) * kh.block_size));
519 maybe_spill(b, dw, ec);
520 if (ec)
521 Throw<nudb::system_error>(ec);
522 b.insert(offset, size, h);
523 }
524 else
525 {
526 // VFALCO Should never get here
527 // Spill Record
528 is = r.prepare(field<std::uint16_t>::size, ec);
529 if (ec)
530 Throw<nudb::system_error>(ec);
531 read<std::uint16_t>(is, size); // Size
532 r.prepare(size, ec); // skip
533 if (ec)
534 Throw<nudb::system_error>(ec);
535 }
536 }
537 kf.write((b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
538 if (ec)
539 Throw<nudb::system_error>(ec);
540 ++npass;
541 }
542 dw.flush(ec);
543 if (ec)
544 Throw<nudb::system_error>(ec);
545 p.finish(log);
546 }
547};
548
549BEAST_DEFINE_TESTSUITE_MANUAL(import, nodestore, xrpl);
550
551#endif
552
553//------------------------------------------------------------------------------
554
555} // namespace NodeStore
556} // namespace xrpl
T begin(T... args)
T ceil(T... args)
A clock whose minimum resolution is one second.
A testsuite class.
Definition suite.h:50
beast::BasicSecondsClock clock_type
clock_type::time_point start_
Progress(std::size_t work)
clock_type::time_point now_
clock_type::time_point report_
std::size_t const work_
void operator()(Log &log, std::size_t work)
std::ios::char_type fill_
SaveStreamState & operator=(SaveStreamState const &)=delete
SaveStreamState(SaveStreamState const &)=delete
SaveStreamState(std::ostream &os)
T data(T... args)
T emplace(T... args)
T end(T... args)
T fixed(T... args)
T log(T... args)
T max(T... args)
T memcmp(T... args)
T memcpy(T... args)
T memset(T... args)
T min(T... args)
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
Definition rfc2616.h:102
void check(bool condition, std::string const &message)
void filterInner(void *in, std::size_t inSize)
Definition codec.h:289
std::pair< void const *, std::size_t > nodeobjectCompress(void const *in, std::size_t inSize, BufferFactory &&bf)
Definition codec.h:197
std::map< std::string, std::string, boost::beast::iless > parseArgs(std::string const &s)
void write(nudb::detail::ostream &os, std::size_t t)
Definition varint.h:117
std::pair< void const *, std::size_t > nodeobjectDecompress(void const *in, std::size_t inSize, BufferFactory &&bf)
Definition codec.h:85
std::ostream & prettyTime(std::ostream &os, std::chrono::duration< Rep, Period > d)
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
int run(int argc, char **argv)
Definition Main.cpp:354
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T reset(T... args)
T setprecision(T... args)
T size(T... args)
T stoull(T... args)
T str(T... args)
T to_string(T... args)