rippled
Loading...
Searching...
No Matches
import_test.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/basics/rocksdb.h>
3#include <xrpl/beast/clock/basic_seconds_clock.h>
4#include <xrpl/beast/core/LexicalCast.h>
5#include <xrpl/beast/rfc2616.h>
6#include <xrpl/beast/unit_test.h>
7#include <xrpl/nodestore/detail/codec.h>
8
9#include <boost/beast/core/string.hpp>
10#include <boost/regex.hpp>
11
12#include <nudb/create.hpp>
13#include <nudb/detail/format.hpp>
14#include <nudb/xxhasher.hpp>
15
16#include <algorithm>
17#include <chrono>
18#include <iomanip>
19#include <map>
20#include <sstream>
21
22/*
23
24Math:
25
261000 gb dat file
27170 gb key file
28capacity 113 keys/bucket
29
30normal:
311,000gb data file read
3219,210gb key file read (113 * 170)
3319,210gb key file write
34
35multi(32gb):
366 passes (170/32)
376,000gb data file read
38170gb key file write
39
40
41*/
42
43namespace ripple {
44
45namespace detail {
46
48{
51 std::ios::fmtflags flags_;
52 std::ios::char_type fill_;
53
54public:
63 operator=(save_stream_state const&) = delete;
65 : os_(os)
66 , precision_(os.precision())
67 , flags_(os.flags())
68 , fill_(os.fill())
69 {
70 }
71};
72
73template <class Rep, class Period>
76{
78 using namespace std::chrono;
79 if (d < microseconds{1})
80 {
81 // use nanoseconds
82 if (d < nanoseconds{100})
83 {
84 // use floating
86 os << std::fixed << std::setprecision(1) << ns(d).count();
87 }
88 else
89 {
90 // use integral
91 os << round<nanoseconds>(d).count();
92 }
93 os << "ns";
94 }
95 else if (d < milliseconds{1})
96 {
97 // use microseconds
98 if (d < microseconds{100})
99 {
100 // use floating
102 os << std::fixed << std::setprecision(1) << ms(d).count();
103 }
104 else
105 {
106 // use integral
107 os << round<microseconds>(d).count();
108 }
109 os << "us";
110 }
111 else if (d < seconds{1})
112 {
113 // use milliseconds
114 if (d < milliseconds{100})
115 {
116 // use floating
118 os << std::fixed << std::setprecision(1) << ms(d).count();
119 }
120 else
121 {
122 // use integral
123 os << round<milliseconds>(d).count();
124 }
125 os << "ms";
126 }
127 else if (d < minutes{1})
128 {
129 // use seconds
130 if (d < seconds{100})
131 {
132 // use floating
133 using s = duration<float>;
134 os << std::fixed << std::setprecision(1) << s(d).count();
135 }
136 else
137 {
138 // use integral
139 os << round<seconds>(d).count();
140 }
141 os << "s";
142 }
143 else
144 {
145 // use minutes
146 if (d < minutes{100})
147 {
148 // use floating
150 os << std::fixed << std::setprecision(1) << m(d).count();
151 }
152 else
153 {
154 // use integral
155 os << round<minutes>(d).count();
156 }
157 os << "min";
158 }
159 return os;
160}
161
162template <class Period, class Rep>
163inline std::string
165{
167 pretty_time(ss, d);
168 return ss.str();
169}
170
171} // namespace detail
172
173namespace NodeStore {
174
175//------------------------------------------------------------------------------
176
178{
179private:
181
187 bool estimate_ = false;
188
189public:
190 explicit progress(std::size_t work) : work_(work)
191 {
192 }
193
194 template <class Log>
195 void
196 operator()(Log& log, std::size_t work)
197 {
198 using namespace std::chrono;
199 auto const now = clock_type::now();
200 if (now == now_)
201 return;
202 now_ = now;
203 auto const elapsed = now - start_;
204 if (!estimate_)
205 {
206 if (elapsed < seconds(15))
207 return;
208 estimate_ = true;
209 }
210 else if (now - report_ < std::chrono::seconds(60))
211 {
212 return;
213 }
214 auto const rate = elapsed.count() / double(work);
215 clock_type::duration const remain(
216 static_cast<clock_type::duration::rep>((work_ - work) * rate));
217 log << "Remaining: " << detail::fmtdur(remain) << " (" << work << " of "
218 << work_ << " in " << detail::fmtdur(elapsed) << ", "
219 << (work - prev_) << " in " << detail::fmtdur(now - report_) << ")";
220 report_ = now;
221 prev_ = work;
222 }
223
224 template <class Log>
225 void
226 finish(Log& log)
227 {
228 log << "Total time: " << detail::fmtdur(clock_type::now() - start_);
229 }
230};
231
234{
235 // <key> '=' <value>
236 static boost::regex const re1(
237 "^" // start of line
238 "(?:\\s*)" // whitespace (optonal)
239 "([a-zA-Z][_a-zA-Z0-9]*)" // <key>
240 "(?:\\s*)" // whitespace (optional)
241 "(?:=)" // '='
242 "(?:\\s*)" // whitespace (optional)
243 "(.*\\S+)" // <value>
244 "(?:\\s*)" // whitespace (optional)
245 ,
246 boost::regex_constants::optimize);
248 auto const v = beast::rfc2616::split(s.begin(), s.end(), ',');
249 for (auto const& kv : v)
250 {
251 boost::smatch m;
252 if (!boost::regex_match(kv, m, re1))
253 Throw<std::runtime_error>("invalid parameter " + kv);
254 auto const result = map.emplace(m[1], m[2]);
255 if (!result.second)
256 Throw<std::runtime_error>("duplicate parameter " + m[1]);
257 }
258 return map;
259}
260
261//------------------------------------------------------------------------------
262
263#if XRPL_ROCKSDB_AVAILABLE
264
265class import_test : public beast::unit_test::suite
266{
267public:
268 void
269 run() override
270 {
271 testcase(beast::unit_test::abort_on_fail) << arg();
272
273 using namespace nudb;
274 using namespace nudb::detail;
275
276 pass();
277 auto const args = parse_args(arg());
278 bool usage = args.empty();
279
280 if (!usage && args.find("from") == args.end())
281 {
282 log << "Missing parameter: from";
283 usage = true;
284 }
285 if (!usage && args.find("to") == args.end())
286 {
287 log << "Missing parameter: to";
288 usage = true;
289 }
290 if (!usage && args.find("buffer") == args.end())
291 {
292 log << "Missing parameter: buffer";
293 usage = true;
294 }
295
296 if (usage)
297 {
298 log << "Usage:\n"
299 << "--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
300 << "from: RocksDB database to import from\n"
301 << "to: NuDB database to import to\n"
302 << "buffer: Buffer size (bigger is faster)\n"
303 << "NuDB database must not already exist.";
304 return;
305 }
306
307 // This controls the size of the bucket buffer.
308 // For a 1TB data file, a 32GB bucket buffer is suggested.
309 // The larger the buffer, the faster the import.
310 //
311 std::size_t const buffer_size = std::stoull(args.at("buffer"));
312 auto const from_path = args.at("from");
313 auto const to_path = args.at("to");
314
315 using hash_type = nudb::xxhasher;
316 auto const bulk_size = 64 * 1024 * 1024;
317 float const load_factor = 0.5;
318
319 auto const dp = to_path + ".dat";
320 auto const kp = to_path + ".key";
321
322 auto const start = std::chrono::steady_clock::now();
323
324 log << "from: " << from_path
325 << "\n"
326 "to: "
327 << to_path
328 << "\n"
329 "buffer: "
330 << buffer_size;
331
333 {
334 rocksdb::Options options;
335 options.create_if_missing = false;
336 options.max_open_files = 2000; // 5000?
337 rocksdb::DB* pdb = nullptr;
338 rocksdb::Status status =
339 rocksdb::DB::OpenForReadOnly(options, from_path, &pdb);
340 if (!status.ok() || !pdb)
341 Throw<std::runtime_error>(
342 "Can't open '" + from_path + "': " + status.ToString());
343 db.reset(pdb);
344 }
345 // Create data file with values
346 std::size_t nitems = 0;
347 dat_file_header dh;
348 dh.version = currentVersion;
349 dh.uid = make_uid();
350 dh.appnum = 1;
351 dh.key_size = 32;
352
353 native_file df;
354 error_code ec;
355 df.create(file_mode::append, dp, ec);
356 if (ec)
357 Throw<nudb::system_error>(ec);
358 bulk_writer<native_file> dw(df, 0, bulk_size);
359 {
360 {
361 auto os = dw.prepare(dat_file_header::size, ec);
362 if (ec)
363 Throw<nudb::system_error>(ec);
364 write(os, dh);
365 }
366 rocksdb::ReadOptions options;
367 options.verify_checksums = false;
368 options.fill_cache = false;
369 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
370
371 buffer buf;
372 for (it->SeekToFirst(); it->Valid(); it->Next())
373 {
374 if (it->key().size() != 32)
375 Throw<std::runtime_error>(
376 "Unexpected key size " +
377 std::to_string(it->key().size()));
378 void const* const key = it->key().data();
379 void const* const data = it->value().data();
380 auto const size = it->value().size();
381 std::unique_ptr<char[]> clean(new char[size]);
382 std::memcpy(clean.get(), data, size);
383 filter_inner(clean.get(), size);
384 auto const out = nodeobject_compress(clean.get(), size, buf);
385 // Verify codec correctness
386 {
387 buffer buf2;
388 auto const check =
389 nodeobject_decompress(out.first, out.second, buf2);
390 BEAST_EXPECT(check.second == size);
391 BEAST_EXPECT(
392 std::memcmp(check.first, clean.get(), size) == 0);
393 }
394 // Data Record
395 auto os = dw.prepare(
396 field<uint48_t>::size + // Size
397 32 + // Key
398 out.second,
399 ec);
400 if (ec)
401 Throw<nudb::system_error>(ec);
402 write<uint48_t>(os, out.second);
403 std::memcpy(os.data(32), key, 32);
404 std::memcpy(os.data(out.second), out.first, out.second);
405 ++nitems;
406 }
407 dw.flush(ec);
408 if (ec)
409 Throw<nudb::system_error>(ec);
410 }
411 db.reset();
412 log << "Import data: "
413 << detail::fmtdur(std::chrono::steady_clock::now() - start);
414 auto const df_size = df.size(ec);
415 if (ec)
416 Throw<nudb::system_error>(ec);
417 // Create key file
418 key_file_header kh;
419 kh.version = currentVersion;
420 kh.uid = dh.uid;
421 kh.appnum = dh.appnum;
422 kh.key_size = 32;
423 kh.salt = make_salt();
424 kh.pepper = pepper<hash_type>(kh.salt);
425 kh.block_size = block_size(kp);
426 kh.load_factor = std::min<std::size_t>(65536.0 * load_factor, 65535);
427 kh.buckets =
428 std::ceil(nitems / (bucket_capacity(kh.block_size) * load_factor));
429 kh.modulus = ceil_pow2(kh.buckets);
430 native_file kf;
431 kf.create(file_mode::append, kp, ec);
432 if (ec)
433 Throw<nudb::system_error>(ec);
434 buffer buf(kh.block_size);
435 {
436 std::memset(buf.get(), 0, kh.block_size);
437 ostream os(buf.get(), kh.block_size);
438 write(os, kh);
439 kf.write(0, buf.get(), kh.block_size, ec);
440 if (ec)
441 Throw<nudb::system_error>(ec);
442 }
443 // Build contiguous sequential sections of the
444 // key file using multiple passes over the data.
445 //
446 auto const buckets =
447 std::max<std::size_t>(1, buffer_size / kh.block_size);
448 buf.reserve(buckets * kh.block_size);
449 auto const passes = (kh.buckets + buckets - 1) / buckets;
450 log << "items: " << nitems
451 << "\n"
452 "buckets: "
453 << kh.buckets
454 << "\n"
455 "data: "
456 << df_size
457 << "\n"
458 "passes: "
459 << passes;
460 progress p(df_size * passes);
461 std::size_t npass = 0;
462 for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
463 {
464 auto const b1 = std::min(b0 + buckets, kh.buckets);
465 // Buffered range is [b0, b1)
466 auto const bn = b1 - b0;
467 // Create empty buckets
468 for (std::size_t i = 0; i < bn; ++i)
469 {
470 bucket b(kh.block_size, buf.get() + i * kh.block_size, empty);
471 }
472 // Insert all keys into buckets
473 // Iterate Data File
474 bulk_reader<native_file> r(
475 df, dat_file_header::size, df_size, bulk_size);
476 while (!r.eof())
477 {
478 auto const offset = r.offset();
479 // Data Record or Spill Record
481 auto is = r.prepare(field<uint48_t>::size, ec); // Size
482 if (ec)
483 Throw<nudb::system_error>(ec);
484 read<uint48_t>(is, size);
485 if (size > 0)
486 {
487 // Data Record
488 is = r.prepare(
489 dh.key_size + // Key
490 size,
491 ec); // Data
492 if (ec)
493 Throw<nudb::system_error>(ec);
494 std::uint8_t const* const key = is.data(dh.key_size);
495 auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
496 auto const n = bucket_index(h, kh.buckets, kh.modulus);
497 p(log, npass * df_size + r.offset());
498 if (n < b0 || n >= b1)
499 continue;
500 bucket b(
501 kh.block_size, buf.get() + (n - b0) * kh.block_size);
502 maybe_spill(b, dw, ec);
503 if (ec)
504 Throw<nudb::system_error>(ec);
505 b.insert(offset, size, h);
506 }
507 else
508 {
509 // VFALCO Should never get here
510 // Spill Record
511 is = r.prepare(field<std::uint16_t>::size, ec);
512 if (ec)
513 Throw<nudb::system_error>(ec);
514 read<std::uint16_t>(is, size); // Size
515 r.prepare(size, ec); // skip
516 if (ec)
517 Throw<nudb::system_error>(ec);
518 }
519 }
520 kf.write(
521 (b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
522 if (ec)
523 Throw<nudb::system_error>(ec);
524 ++npass;
525 }
526 dw.flush(ec);
527 if (ec)
528 Throw<nudb::system_error>(ec);
529 p.finish(log);
530 }
531};
532
533BEAST_DEFINE_TESTSUITE_MANUAL(import, nodestore, ripple);
534
535#endif
536
537//------------------------------------------------------------------------------
538
539} // namespace NodeStore
540} // namespace ripple
T begin(T... args)
T ceil(T... args)
A clock whose minimum resolution is one second.
typename Clock::duration duration
typename Clock::time_point time_point
A testsuite class.
Definition suite.h:52
void operator()(Log &log, std::size_t work)
clock_type::time_point start_
clock_type::time_point report_
clock_type::time_point now_
save_stream_state & operator=(save_stream_state const &)=delete
save_stream_state(save_stream_state const &)=delete
T emplace(T... args)
T end(T... args)
T fill(T... args)
T fixed(T... args)
T flags(T... args)
T is_same_v
T log(T... args)
T memcmp(T... args)
T memcpy(T... args)
T memset(T... args)
T min(T... args)
void check(bool condition, std::string const &message)
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
Definition rfc2616.h:104
std::map< std::string, std::string, boost::beast::iless > parse_args(std::string const &s)
void filter_inner(void *in, std::size_t in_size)
Definition codec.h:298
void write(nudb::detail::ostream &os, std::size_t t)
Definition varint.h:115
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:91
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:202
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
std::ostream & pretty_time(std::ostream &os, std::chrono::duration< Rep, Period > d)
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
int run(int argc, char **argv)
Definition Main.cpp:330
T precision(T... args)
T reset(T... args)
T setprecision(T... args)
T size(T... args)
T stoull(T... args)
T str(T... args)
T to_string(T... args)