xrpld
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1#if XRPL_ROCKSDB_AVAILABLE
2#include <xrpl/basics/ByteUtilities.h>
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/base_uint.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/beast/utility/Journal.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/config/BasicConfig.h>
11#include <xrpl/config/Constants.h>
12#include <xrpl/nodestore/Backend.h>
13#include <xrpl/nodestore/Factory.h>
14#include <xrpl/nodestore/Manager.h>
15#include <xrpl/nodestore/NodeObject.h>
16#include <xrpl/nodestore/Scheduler.h>
17#include <xrpl/nodestore/Types.h>
18#include <xrpl/nodestore/detail/BatchWriter.h>
19#include <xrpl/nodestore/detail/DecodedBlob.h>
20#include <xrpl/nodestore/detail/EncodedBlob.h>
21
22#include <boost/filesystem/operations.hpp>
23#include <boost/filesystem/path.hpp>
24
25#include <rocksdb/advanced_options.h>
26#include <rocksdb/cache.h>
27#include <rocksdb/compression_type.h>
28#include <rocksdb/convenience.h>
29#include <rocksdb/db.h>
30#include <rocksdb/env.h>
31#include <rocksdb/filter_policy.h>
32#include <rocksdb/iterator.h>
33#include <rocksdb/options.h>
34#include <rocksdb/slice.h>
35#include <rocksdb/table.h>
36#include <rocksdb/write_batch.h>
37
38#include <atomic>
39#include <cstddef>
40#include <functional>
41#include <memory>
42#include <stdexcept>
43#include <string>
44
45namespace xrpl::NodeStore {
46
47class RocksDBEnv : public rocksdb::EnvWrapper
48{
49public:
50 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
51 {
52 }
53
54 struct ThreadParams
55 {
56 ThreadParams(void (*f)(void*), void* a) : f(f), a(a)
57 {
58 }
59
60 void (*f)(void*);
61 void* a;
62 };
63
64 static void
65 threadEntry(void* ptr)
66 {
67 ThreadParams const* const p(reinterpret_cast<ThreadParams*>(ptr));
68 auto const f = p->f;
69
70 void* a(p->a);
71 delete p;
72
73 static std::atomic<std::size_t> kN;
74 std::size_t const id(++kN);
76
77 f(a);
78 }
79
80 void
81 StartThread(void (*f)(void*), void* a) override
82 {
83 ThreadParams* const p(new ThreadParams(f, a));
84 EnvWrapper::StartThread(&RocksDBEnv::threadEntry, p);
85 }
86};
87
88//------------------------------------------------------------------------------
89
90class RocksDBBackend : public Backend, public BatchWriter::Callback
91{
92private:
93 std::atomic<bool> deletePath_;
94
95public:
96 beast::Journal journal;
97 size_t const keyBytes;
98 BatchWriter batch;
99 std::string name;
100 std::unique_ptr<rocksdb::DB> db;
101 int fdMinRequired = 2048;
102 rocksdb::Options options;
103
104 RocksDBBackend(
105 int keyBytes,
106 Section const& keyValues,
107 Scheduler& scheduler,
108 beast::Journal journal,
109 RocksDBEnv* env)
110 : deletePath_(false), journal(journal), keyBytes(keyBytes), batch(*this, scheduler)
111 {
112 if (!getIfExists(keyValues, Keys::kPath, name))
113 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
114
115 rocksdb::BlockBasedTableOptions tableOptions;
116 options.env = env;
117
118 bool const hardSet =
119 keyValues.exists(Keys::kHardSet) && get<bool>(keyValues, Keys::kHardSet);
120
121 if (keyValues.exists(Keys::kCacheMb))
122 {
123 auto size = get<int>(keyValues, Keys::kCacheMb);
124
125 if (!hardSet && size == 256)
126 size = 1024;
127
128 tableOptions.block_cache = rocksdb::NewLRUCache(megabytes(size));
129 }
130
131 if (auto const v = get<int>(keyValues, Keys::kFilterBits))
132 {
133 bool const filterBlocks = !keyValues.exists(Keys::kFilterFull) ||
134 (get<int>(keyValues, Keys::kFilterFull) == 0);
135 tableOptions.filter_policy.reset(rocksdb::NewBloomFilterPolicy(v, filterBlocks));
136 }
137
138 if (getIfExists(keyValues, Keys::kOpenFiles, options.max_open_files))
139 {
140 if (!hardSet && options.max_open_files == 2000)
141 options.max_open_files = 8000;
142
143 fdMinRequired = options.max_open_files + 128;
144 }
145
146 if (keyValues.exists(Keys::kFileSizeMb))
147 {
148 auto fileSizeMb = get<int>(keyValues, Keys::kFileSizeMb);
149
150 if (!hardSet && fileSizeMb == 8)
151 fileSizeMb = 256;
152
153 options.target_file_size_base = megabytes(fileSizeMb);
154 options.max_bytes_for_level_base = 5 * options.target_file_size_base;
155 options.write_buffer_size = 2 * options.target_file_size_base;
156 }
157
158 getIfExists(keyValues, Keys::kFileSizeMult, options.target_file_size_multiplier);
159
160 if (keyValues.exists(Keys::kBgThreads))
161 {
162 options.env->SetBackgroundThreads(
163 get<int>(keyValues, Keys::kBgThreads), rocksdb::Env::LOW);
164 }
165
166 if (keyValues.exists(Keys::kHighThreads))
167 {
168 auto const highThreads = get<int>(keyValues, Keys::kHighThreads);
169 options.env->SetBackgroundThreads(highThreads, rocksdb::Env::HIGH);
170
171 // If we have high-priority threads, presumably we want to
172 // use them for background flushes
173 if (highThreads > 0)
174 options.max_background_flushes = highThreads;
175 }
176
177 options.compression = rocksdb::kSnappyCompression;
178
179 getIfExists(keyValues, Keys::kBlockSize, tableOptions.block_size);
180
181 if (keyValues.exists(Keys::kUniversalCompaction) &&
182 (get<int>(keyValues, Keys::kUniversalCompaction) != 0))
183 {
184 options.compaction_style = rocksdb::kCompactionStyleUniversal;
185 options.min_write_buffer_number_to_merge = 2;
186 options.max_write_buffer_number = 6;
187 options.write_buffer_size = 6 * options.target_file_size_base;
188 }
189
190 if (keyValues.exists(Keys::kBbtOptions))
191 {
192 rocksdb::ConfigOptions const configOptions;
193 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
194 configOptions, tableOptions, get(keyValues, Keys::kBbtOptions), &tableOptions);
195 if (!s.ok())
196 {
197 Throw<std::runtime_error>(
198 std::string("Unable to set RocksDB bbt_options: ") + s.ToString());
199 }
200 }
201
202 options.table_factory.reset(NewBlockBasedTableFactory(tableOptions));
203
204 if (keyValues.exists(Keys::kOptions))
205 {
206 auto const s =
207 rocksdb::GetOptionsFromString(options, get(keyValues, Keys::kOptions), &options);
208 if (!s.ok())
209 {
210 Throw<std::runtime_error>(
211 std::string("Unable to set RocksDB options: ") + s.ToString());
212 }
213 }
214
215 std::string s1, s2;
216 rocksdb::GetStringFromDBOptions(&s1, options, "; ");
217 rocksdb::GetStringFromColumnFamilyOptions(&s2, options, "; ");
218 JLOG(journal.debug()) << "RocksDB DBOptions: " << s1;
219 JLOG(journal.debug()) << "RocksDB CFOptions: " << s2;
220 }
221
222 ~RocksDBBackend() override
223 {
224 close();
225 }
226
227 void
228 open(bool createIfMissing) override
229 {
230 if (db)
231 {
232 // LCOV_EXCL_START
233 UNREACHABLE(
234 "xrpl::NodeStore::RocksDBBackend::open : database is already "
235 "open");
236 JLOG(journal.error()) << "database is already open";
237 return;
238 // LCOV_EXCL_STOP
239 }
240 rocksdb::DB* localDb = nullptr;
241 options.create_if_missing = createIfMissing;
242 rocksdb::Status const status = rocksdb::DB::Open(options, name, &localDb);
243 if (!status.ok() || (localDb == nullptr))
244 {
245 Throw<std::runtime_error>(
246 std::string("Unable to open/create RocksDB: ") + status.ToString());
247 }
248 db.reset(localDb);
249 }
250
251 bool
252 isOpen() override
253 {
254 return static_cast<bool>(db);
255 }
256
257 void
258 close() override
259 {
260 if (db)
261 {
262 db.reset();
263 if (deletePath_)
264 {
265 boost::filesystem::path const dir = name;
266 boost::filesystem::remove_all(dir);
267 }
268 }
269 }
270
272 getName() override
273 {
274 return name;
275 }
276
277 //--------------------------------------------------------------------------
278
279 Status
280 fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
281 {
282 XRPL_ASSERT(db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
283 pObject->reset();
284
285 Status status = Status::Ok;
286
287 rocksdb::ReadOptions const options;
288 rocksdb::Slice const slice(reinterpret_cast<char const*>(hash.data()), keyBytes);
289
290 std::string string;
291
292 rocksdb::Status const getStatus = db->Get(options, slice, &string);
293
294 if (getStatus.ok())
295 {
296 DecodedBlob decoded(hash.data(), string.data(), string.size());
297
298 if (decoded.wasOk())
299 {
300 *pObject = decoded.createObject();
301 }
302 else
303 {
304 // Decoding failed, probably corrupted!
305 //
306 status = Status::DataCorrupt;
307 }
308 }
309 else
310 {
311 if (getStatus.IsCorruption())
312 {
313 status = Status::DataCorrupt;
314 }
315 else if (getStatus.IsNotFound())
316 {
317 status = Status::NotFound;
318 }
319 else
320 {
321 status = static_cast<Status>(
322 static_cast<int>(Status::CustomCode) + unsafeCast<int>(getStatus.code()));
323
324 JLOG(journal.error()) << getStatus.ToString();
325 }
326 }
327
328 return status;
329 }
330
331 void
332 store(std::shared_ptr<NodeObject> const& object) override
333 {
334 batch.store(object);
335 }
336
337 void
338 storeBatch(Batch const& batch) override
339 {
340 XRPL_ASSERT(
341 db,
342 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
343 "database");
344 rocksdb::WriteBatch wb;
345
346 for (auto const& e : batch)
347 {
348 EncodedBlob const encoded(e);
349
350 wb.Put(
351 rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()), keyBytes),
352 rocksdb::Slice(
353 reinterpret_cast<char const*>(encoded.getData()), encoded.getSize()));
354 }
355
356 rocksdb::WriteOptions const options;
357
358 auto ret = db->Write(options, &wb);
359
360 if (!ret.ok())
361 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
362 }
363
364 void
365 sync() override
366 {
367 }
368
369 void
370 forEach(std::function<void(std::shared_ptr<NodeObject>)> f) override
371 {
372 XRPL_ASSERT(db, "xrpl::NodeStore::RocksDBBackend::forEach : non-null database");
373 rocksdb::ReadOptions const options;
374
375 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
376
377 for (it->SeekToFirst(); it->Valid(); it->Next())
378 {
379 if (it->key().size() == keyBytes)
380 {
381 DecodedBlob decoded(it->key().data(), it->value().data(), it->value().size());
382
383 if (decoded.wasOk())
384 {
385 f(decoded.createObject());
386 }
387 else
388 {
389 // Uh oh, corrupted data!
390 JLOG(journal.fatal()) << "Corrupt NodeObject #" << it->key().ToString(true);
391 }
392 }
393 else
394 {
395 // VFALCO NOTE What does it mean to find an
396 // incorrectly sized key? Corruption?
397 JLOG(journal.fatal()) << "Bad key size = " << it->key().size();
398 }
399 }
400 }
401
402 int
403 getWriteLoad() override
404 {
405 return batch.getWriteLoad();
406 }
407
408 void
409 setDeletePath() override
410 {
411 deletePath_ = true;
412 }
413
414 //--------------------------------------------------------------------------
415
416 void
417 writeBatch(Batch const& batch) override
418 {
419 storeBatch(batch);
420 }
421
423 [[nodiscard]] int
424 fdRequired() const override
425 {
426 return fdMinRequired;
427 }
428};
429
430//------------------------------------------------------------------------------
431
432class RocksDBFactory : public Factory
433{
434private:
435 Manager& manager_;
436
437public:
438 RocksDBEnv env;
439
440 RocksDBFactory(Manager& manager) : manager_(manager)
441 {
442 manager_.insert(*this);
443 }
444
445 [[nodiscard]] std::string
446 getName() const override
447 {
448 return "RocksDB";
449 }
450
451 std::unique_ptr<Backend>
452 createInstance(
453 size_t keyBytes,
454 Section const& keyValues,
455 std::size_t,
456 Scheduler& scheduler,
457 beast::Journal journal) override
458 {
459 return std::make_unique<RocksDBBackend>(keyBytes, keyValues, scheduler, journal, &env);
460 }
461};
462
463void
464registerRocksDBFactory(Manager& manager)
465{
466 static RocksDBFactory const kInstance{manager};
467}
468
469} // namespace xrpl::NodeStore
470
471#endif
Stream fatal() const
Definition Journal.h:321
Stream error() const
Definition Journal.h:315
Stream debug() const
Definition Journal.h:297
A backend used for the NodeStore.
Definition Backend.h:19
T make_unique(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
bool getIfExists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:92
T reset(T... args)
This callback does the actual writing.
Definition BatchWriter.h:25
T to_string(T... args)