rippled
Loading...
Searching...
No Matches
import_test.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/basics/rocksdb.h>
3#include <xrpl/beast/clock/basic_seconds_clock.h>
4#include <xrpl/beast/core/LexicalCast.h>
5#include <xrpl/beast/rfc2616.h>
6#include <xrpl/beast/unit_test.h>
7#include <xrpl/nodestore/detail/codec.h>
8
9#include <boost/beast/core/string.hpp>
10#include <boost/regex.hpp>
11
12#include <nudb/create.hpp>
13#include <nudb/detail/format.hpp>
14#include <nudb/xxhasher.hpp>
15
16#include <algorithm>
17#include <chrono>
18#include <iomanip>
19#include <map>
20#include <sstream>
21
22/*
23
24Math:
25
261000 gb dat file
27170 gb key file
28capacity 113 keys/bucket
29
30normal:
311,000gb data file read
3219,210gb key file read (113 * 170)
3319,210gb key file write
34
35multi(32gb):
366 passes (170/32)
376,000gb data file read
38170gb key file write
39
40
41*/
42
43namespace xrpl {
44
45namespace detail {
46
48{
51 std::ios::fmtflags flags_;
52 std::ios::char_type fill_;
53
54public:
63 operator=(save_stream_state const&) = delete;
65 : os_(os), precision_(os.precision()), flags_(os.flags()), fill_(os.fill())
66 {
67 }
68};
69
70template <class Rep, class Period>
73{
74 save_stream_state const _(os);
75 using namespace std::chrono;
76 if (d < microseconds{1})
77 {
78 // use nanoseconds
79 if (d < nanoseconds{100})
80 {
81 // use floating
83 os << std::fixed << std::setprecision(1) << ns(d).count();
84 }
85 else
86 {
87 // use integral
88 os << round<nanoseconds>(d).count();
89 }
90 os << "ns";
91 }
92 else if (d < milliseconds{1})
93 {
94 // use microseconds
95 if (d < microseconds{100})
96 {
97 // use floating
99 os << std::fixed << std::setprecision(1) << ms(d).count();
100 }
101 else
102 {
103 // use integral
104 os << round<microseconds>(d).count();
105 }
106 os << "us";
107 }
108 else if (d < seconds{1})
109 {
110 // use milliseconds
111 if (d < milliseconds{100})
112 {
113 // use floating
115 os << std::fixed << std::setprecision(1) << ms(d).count();
116 }
117 else
118 {
119 // use integral
120 os << round<milliseconds>(d).count();
121 }
122 os << "ms";
123 }
124 else if (d < minutes{1})
125 {
126 // use seconds
127 if (d < seconds{100})
128 {
129 // use floating
130 using s = duration<float>;
131 os << std::fixed << std::setprecision(1) << s(d).count();
132 }
133 else
134 {
135 // use integral
136 os << round<seconds>(d).count();
137 }
138 os << "s";
139 }
140 else
141 {
142 // use minutes
143 if (d < minutes{100})
144 {
145 // use floating
147 os << std::fixed << std::setprecision(1) << m(d).count();
148 }
149 else
150 {
151 // use integral
152 os << round<minutes>(d).count();
153 }
154 os << "min";
155 }
156 return os;
157}
158
159template <class Period, class Rep>
160inline std::string
162{
164 pretty_time(ss, d);
165 return ss.str();
166}
167
168} // namespace detail
169
170namespace NodeStore {
171
172//------------------------------------------------------------------------------
173
175{
176private:
178
184 bool estimate_ = false;
185
186public:
187 explicit progress(std::size_t work) : work_(work)
188 {
189 }
190
191 template <class Log>
192 void
193 operator()(Log& log, std::size_t work)
194 {
195 using namespace std::chrono;
196 auto const now = clock_type::now();
197 if (now == now_)
198 return;
199 now_ = now;
200 auto const elapsed = now - start_;
201 if (!estimate_)
202 {
203 if (elapsed < seconds(15))
204 return;
205 estimate_ = true;
206 }
207 else if (now - report_ < std::chrono::seconds(60))
208 {
209 return;
210 }
211 auto const rate = elapsed.count() / double(work);
212 clock_type::duration const remain(
213 static_cast<clock_type::duration::rep>((work_ - work) * rate));
214 log << "Remaining: " << detail::fmtdur(remain) << " (" << work << " of " << work_ << " in "
215 << detail::fmtdur(elapsed) << ", " << (work - prev_) << " in "
216 << detail::fmtdur(now - report_) << ")";
217 report_ = now;
218 prev_ = work;
219 }
220
221 template <class Log>
222 void
223 finish(Log& log)
224 {
225 log << "Total time: " << detail::fmtdur(clock_type::now() - start_);
226 }
227};
228
231{
232 // <key> '=' <value>
233 static boost::regex const re1(
234 "^" // start of line
235 "(?:\\s*)" // whitespace (optional)
236 "([a-zA-Z][_a-zA-Z0-9]*)" // <key>
237 "(?:\\s*)" // whitespace (optional)
238 "(?:=)" // '='
239 "(?:\\s*)" // whitespace (optional)
240 "(.*\\S+)" // <value>
241 "(?:\\s*)" // whitespace (optional)
242 ,
243 boost::regex_constants::optimize);
245 auto const v = beast::rfc2616::split(s.begin(), s.end(), ',');
246 for (auto const& kv : v)
247 {
248 boost::smatch m;
249 if (!boost::regex_match(kv, m, re1))
250 Throw<std::runtime_error>("invalid parameter " + kv);
251 auto const result = map.emplace(m[1], m[2]);
252 if (!result.second)
253 Throw<std::runtime_error>("duplicate parameter " + m[1]);
254 }
255 return map;
256}
257
258//------------------------------------------------------------------------------
259
260#if XRPL_ROCKSDB_AVAILABLE
261
262class import_test : public beast::unit_test::suite
263{
264public:
265 void
266 run() override
267 {
268 testcase(beast::unit_test::abort_on_fail) << arg();
269
270 using namespace nudb;
271 using namespace nudb::detail;
272
273 pass();
274 auto const args = parse_args(arg());
275 bool usage = args.empty();
276
277 if (!usage && args.find("from") == args.end())
278 {
279 log << "Missing parameter: from";
280 usage = true;
281 }
282 if (!usage && args.find("to") == args.end())
283 {
284 log << "Missing parameter: to";
285 usage = true;
286 }
287 if (!usage && args.find("buffer") == args.end())
288 {
289 log << "Missing parameter: buffer";
290 usage = true;
291 }
292
293 if (usage)
294 {
295 log << "Usage:\n"
296 << "--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
297 << "from: RocksDB database to import from\n"
298 << "to: NuDB database to import to\n"
299 << "buffer: Buffer size (bigger is faster)\n"
300 << "NuDB database must not already exist.";
301 return;
302 }
303
304 // This controls the size of the bucket buffer.
305 // For a 1TB data file, a 32GB bucket buffer is suggested.
306 // The larger the buffer, the faster the import.
307 //
308 std::size_t const buffer_size = std::stoull(args.at("buffer"));
309 auto const from_path = args.at("from");
310 auto const to_path = args.at("to");
311
312 using hash_type = nudb::xxhasher;
313 auto const bulk_size = 64 * 1024 * 1024;
314 float const load_factor = 0.5;
315
316 auto const dp = to_path + ".dat";
317 auto const kp = to_path + ".key";
318
319 auto const start = std::chrono::steady_clock::now();
320
321 log << "from: " << from_path
322 << "\n"
323 "to: "
324 << to_path
325 << "\n"
326 "buffer: "
327 << buffer_size;
328
330 {
331 rocksdb::Options options;
332 options.create_if_missing = false;
333 options.max_open_files = 2000; // 5000?
334 rocksdb::DB* pdb = nullptr;
335 rocksdb::Status const status = rocksdb::DB::OpenForReadOnly(options, from_path, &pdb);
336 if (!status.ok() || (pdb == nullptr))
337 Throw<std::runtime_error>("Can't open '" + from_path + "': " + status.ToString());
338 db.reset(pdb);
339 }
340 // Create data file with values
341 std::size_t nitems = 0;
342 dat_file_header dh{};
343 dh.version = currentVersion;
344 dh.uid = make_uid();
345 dh.appnum = 1;
346 dh.key_size = 32;
347
348 native_file df;
349 error_code ec;
350 df.create(file_mode::append, dp, ec);
351 if (ec)
352 Throw<nudb::system_error>(ec);
353 bulk_writer<native_file> dw(df, 0, bulk_size);
354 {
355 {
356 auto os = dw.prepare(dat_file_header::size, ec);
357 if (ec)
358 Throw<nudb::system_error>(ec);
359 write(os, dh);
360 }
361 rocksdb::ReadOptions options;
362 options.verify_checksums = false;
363 options.fill_cache = false;
364 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
365
366 buffer buf;
367 for (it->SeekToFirst(); it->Valid(); it->Next())
368 {
369 if (it->key().size() != 32)
370 {
371 Throw<std::runtime_error>(
372 "Unexpected key size " + std::to_string(it->key().size()));
373 }
374 void const* const key = it->key().data();
375 void const* const data = it->value().data();
376 auto const size = it->value().size();
377 std::unique_ptr<char[]> const clean(new char[size]);
378 std::memcpy(clean.get(), data, size);
379 filter_inner(clean.get(), size);
380 auto const out = nodeobject_compress(clean.get(), size, buf);
381 // Verify codec correctness
382 {
383 buffer buf2;
384 auto const check = nodeobject_decompress(out.first, out.second, buf2);
385 BEAST_EXPECT(check.second == size);
386 BEAST_EXPECT(std::memcmp(check.first, clean.get(), size) == 0);
387 }
388 // Data Record
389 auto os = dw.prepare(
390 field<uint48_t>::size + // Size
391 32 + // Key
392 out.second,
393 ec);
394 if (ec)
395 Throw<nudb::system_error>(ec);
396 write<uint48_t>(os, out.second);
397 std::memcpy(os.data(32), key, 32);
398 std::memcpy(os.data(out.second), out.first, out.second);
399 ++nitems;
400 }
401 dw.flush(ec);
402 if (ec)
403 Throw<nudb::system_error>(ec);
404 }
405 db.reset();
406 log << "Import data: " << detail::fmtdur(std::chrono::steady_clock::now() - start);
407 auto const df_size = df.size(ec);
408 if (ec)
409 Throw<nudb::system_error>(ec);
410 // Create key file
411 key_file_header kh{};
412 kh.version = currentVersion;
413 kh.uid = dh.uid;
414 kh.appnum = dh.appnum;
415 kh.key_size = 32;
416 kh.salt = make_salt();
417 kh.pepper = pepper<hash_type>(kh.salt);
418 kh.block_size = block_size(kp);
419 kh.load_factor = std::min<std::size_t>(65536.0 * load_factor, 65535);
420 kh.buckets = std::ceil(nitems / (bucket_capacity(kh.block_size) * load_factor));
421 kh.modulus = ceil_pow2(kh.buckets);
422 native_file kf;
423 kf.create(file_mode::append, kp, ec);
424 if (ec)
425 Throw<nudb::system_error>(ec);
426 buffer buf(kh.block_size);
427 {
428 std::memset(buf.get(), 0, kh.block_size);
429 ostream os(buf.get(), kh.block_size);
430 write(os, kh);
431 kf.write(0, buf.get(), kh.block_size, ec);
432 if (ec)
433 Throw<nudb::system_error>(ec);
434 }
435 // Build contiguous sequential sections of the
436 // key file using multiple passes over the data.
437 //
438 auto const buckets = std::max<std::size_t>(1, buffer_size / kh.block_size);
439 buf.reserve(buckets * kh.block_size);
440 auto const passes = (kh.buckets + buckets - 1) / buckets;
441 log << "items: " << nitems
442 << "\n"
443 "buckets: "
444 << kh.buckets
445 << "\n"
446 "data: "
447 << df_size
448 << "\n"
449 "passes: "
450 << passes;
451 progress p(df_size * passes);
452 std::size_t npass = 0;
453 for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
454 {
455 auto const b1 = std::min(b0 + buckets, kh.buckets);
456 // Buffered range is [b0, b1)
457 auto const bn = b1 - b0;
458 // Create empty buckets
459 for (std::size_t i = 0; i < bn; ++i)
460 {
461 bucket const b(kh.block_size, buf.get() + (i * kh.block_size), empty);
462 }
463 // Insert all keys into buckets
464 // Iterate Data File
465 bulk_reader<native_file> r(df, dat_file_header::size, df_size, bulk_size);
466 while (!r.eof())
467 {
468 auto const offset = r.offset();
469 // Data Record or Spill Record
470 std::size_t size = 0;
471 auto is = r.prepare(field<uint48_t>::size, ec); // Size
472 if (ec)
473 Throw<nudb::system_error>(ec);
474 read<uint48_t>(is, size);
475 if (size > 0)
476 {
477 // Data Record
478 is = r.prepare(
479 dh.key_size + // Key
480 size,
481 ec); // Data
482 if (ec)
483 Throw<nudb::system_error>(ec);
484 std::uint8_t const* const key = is.data(dh.key_size);
485 auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
486 auto const n = bucket_index(h, kh.buckets, kh.modulus);
487 p(log, (npass * df_size) + r.offset());
488 if (n < b0 || n >= b1)
489 continue;
490 bucket b(kh.block_size, buf.get() + ((n - b0) * kh.block_size));
491 maybe_spill(b, dw, ec);
492 if (ec)
493 Throw<nudb::system_error>(ec);
494 b.insert(offset, size, h);
495 }
496 else
497 {
498 // VFALCO Should never get here
499 // Spill Record
500 is = r.prepare(field<std::uint16_t>::size, ec);
501 if (ec)
502 Throw<nudb::system_error>(ec);
503 read<std::uint16_t>(is, size); // Size
504 r.prepare(size, ec); // skip
505 if (ec)
506 Throw<nudb::system_error>(ec);
507 }
508 }
509 kf.write((b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
510 if (ec)
511 Throw<nudb::system_error>(ec);
512 ++npass;
513 }
514 dw.flush(ec);
515 if (ec)
516 Throw<nudb::system_error>(ec);
517 p.finish(log);
518 }
519};
520
521BEAST_DEFINE_TESTSUITE_MANUAL(import, nodestore, xrpl);
522
523#endif
524
525//------------------------------------------------------------------------------
526
527} // namespace NodeStore
528} // namespace xrpl
T begin(T... args)
T ceil(T... args)
A clock whose minimum resolution is one second.
typename Clock::duration duration
typename Clock::time_point time_point
A testsuite class.
Definition suite.h:51
clock_type::time_point report_
clock_type::time_point start_
progress(std::size_t work)
void operator()(Log &log, std::size_t work)
std::size_t const work_
clock_type::time_point now_
save_stream_state(std::ostream &os)
save_stream_state(save_stream_state const &)=delete
save_stream_state & operator=(save_stream_state const &)=delete
T emplace(T... args)
T end(T... args)
T fill(T... args)
T fixed(T... args)
T flags(T... args)
T is_same_v
T log(T... args)
T memcmp(T... args)
T memcpy(T... args)
T memset(T... args)
T min(T... args)
void check(bool condition, std::string const &message)
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
Definition rfc2616.h:102
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:190
void filter_inner(void *in, std::size_t in_size)
Definition codec.h:282
void write(nudb::detail::ostream &os, std::size_t t)
Definition varint.h:115
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:86
std::map< std::string, std::string, boost::beast::iless > parse_args(std::string const &s)
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
std::ostream & pretty_time(std::ostream &os, std::chrono::duration< Rep, Period > d)
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
int run(int argc, char **argv)
Definition Main.cpp:332
T precision(T... args)
T reset(T... args)
T setprecision(T... args)
T size(T... args)
T stoull(T... args)
T str(T... args)
T to_string(T... args)