1#if XRPL_ROCKSDB_AVAILABLE
2#include <xrpl/basics/ByteUtilities.h>
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/base_uint.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/beast/utility/Journal.h>
9#include <xrpl/beast/utility/instrumentation.h>
10#include <xrpl/config/BasicConfig.h>
11#include <xrpl/config/Constants.h>
12#include <xrpl/nodestore/Backend.h>
13#include <xrpl/nodestore/Factory.h>
14#include <xrpl/nodestore/Manager.h>
15#include <xrpl/nodestore/NodeObject.h>
16#include <xrpl/nodestore/Scheduler.h>
17#include <xrpl/nodestore/Types.h>
18#include <xrpl/nodestore/detail/BatchWriter.h>
19#include <xrpl/nodestore/detail/DecodedBlob.h>
20#include <xrpl/nodestore/detail/EncodedBlob.h>
22#include <boost/filesystem/operations.hpp>
23#include <boost/filesystem/path.hpp>
25#include <rocksdb/advanced_options.h>
26#include <rocksdb/cache.h>
27#include <rocksdb/compression_type.h>
28#include <rocksdb/convenience.h>
29#include <rocksdb/db.h>
30#include <rocksdb/env.h>
31#include <rocksdb/filter_policy.h>
32#include <rocksdb/iterator.h>
33#include <rocksdb/options.h>
34#include <rocksdb/slice.h>
35#include <rocksdb/table.h>
36#include <rocksdb/write_batch.h>
47class RocksDBEnv :
public rocksdb::EnvWrapper
50 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
56 ThreadParams(
void (*f)(
void*),
void* a) : f(f), a(a)
65 threadEntry(
void* ptr)
67 ThreadParams
const*
const p(
reinterpret_cast<ThreadParams*
>(ptr));
73 static std::atomic<std::size_t> kN;
74 std::size_t
const id(++kN);
81 StartThread(
void (*f)(
void*),
void* a)
override
83 ThreadParams*
const p(
new ThreadParams(f, a));
84 EnvWrapper::StartThread(&RocksDBEnv::threadEntry, p);
93 std::atomic<bool> deletePath_;
96 beast::Journal journal;
97 size_t const keyBytes;
100 std::unique_ptr<rocksdb::DB> db;
101 int fdMinRequired = 2048;
102 rocksdb::Options options;
106 Section
const& keyValues,
107 Scheduler& scheduler,
108 beast::Journal journal,
110 : deletePath_(false), journal(journal), keyBytes(keyBytes), batch(*this, scheduler)
113 Throw<std::runtime_error>(
"Missing path in RocksDBFactory backend");
115 rocksdb::BlockBasedTableOptions tableOptions;
119 keyValues.exists(Keys::kHardSet) && get<bool>(keyValues, Keys::kHardSet);
121 if (keyValues.exists(Keys::kCacheMb))
123 auto size = get<int>(keyValues, Keys::kCacheMb);
125 if (!hardSet && size == 256)
128 tableOptions.block_cache = rocksdb::NewLRUCache(megabytes(size));
131 if (
auto const v = get<int>(keyValues, Keys::kFilterBits))
133 bool const filterBlocks = !keyValues.exists(Keys::kFilterFull) ||
134 (get<int>(keyValues, Keys::kFilterFull) == 0);
135 tableOptions.filter_policy.reset(rocksdb::NewBloomFilterPolicy(v, filterBlocks));
138 if (
getIfExists(keyValues, Keys::kOpenFiles, options.max_open_files))
140 if (!hardSet && options.max_open_files == 2000)
141 options.max_open_files = 8000;
143 fdMinRequired = options.max_open_files + 128;
146 if (keyValues.exists(Keys::kFileSizeMb))
148 auto fileSizeMb = get<int>(keyValues, Keys::kFileSizeMb);
150 if (!hardSet && fileSizeMb == 8)
153 options.target_file_size_base = megabytes(fileSizeMb);
154 options.max_bytes_for_level_base = 5 * options.target_file_size_base;
155 options.write_buffer_size = 2 * options.target_file_size_base;
158 getIfExists(keyValues, Keys::kFileSizeMult, options.target_file_size_multiplier);
160 if (keyValues.exists(Keys::kBgThreads))
162 options.env->SetBackgroundThreads(
163 get<int>(keyValues, Keys::kBgThreads), rocksdb::Env::LOW);
166 if (keyValues.exists(Keys::kHighThreads))
168 auto const highThreads = get<int>(keyValues, Keys::kHighThreads);
169 options.env->SetBackgroundThreads(highThreads, rocksdb::Env::HIGH);
174 options.max_background_flushes = highThreads;
177 options.compression = rocksdb::kSnappyCompression;
179 getIfExists(keyValues, Keys::kBlockSize, tableOptions.block_size);
181 if (keyValues.exists(Keys::kUniversalCompaction) &&
182 (get<int>(keyValues, Keys::kUniversalCompaction) != 0))
184 options.compaction_style = rocksdb::kCompactionStyleUniversal;
185 options.min_write_buffer_number_to_merge = 2;
186 options.max_write_buffer_number = 6;
187 options.write_buffer_size = 6 * options.target_file_size_base;
190 if (keyValues.exists(Keys::kBbtOptions))
192 rocksdb::ConfigOptions const configOptions;
193 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
194 configOptions, tableOptions, get(keyValues, Keys::kBbtOptions), &tableOptions);
197 Throw<std::runtime_error>(
198 std::string(
"Unable to set RocksDB bbt_options: ") + s.ToString());
202 options.table_factory.reset(NewBlockBasedTableFactory(tableOptions));
204 if (keyValues.exists(Keys::kOptions))
207 rocksdb::GetOptionsFromString(options, get(keyValues, Keys::kOptions), &options);
210 Throw<std::runtime_error>(
211 std::string(
"Unable to set RocksDB options: ") + s.ToString());
216 rocksdb::GetStringFromDBOptions(&s1, options,
"; ");
217 rocksdb::GetStringFromColumnFamilyOptions(&s2, options,
"; ");
218 JLOG(journal.
debug()) <<
"RocksDB DBOptions: " << s1;
219 JLOG(journal.
debug()) <<
"RocksDB CFOptions: " << s2;
222 ~RocksDBBackend()
override
228 open(
bool createIfMissing)
override
234 "xrpl::NodeStore::RocksDBBackend::open : database is already "
236 JLOG(journal.
error()) <<
"database is already open";
240 rocksdb::DB* localDb =
nullptr;
241 options.create_if_missing = createIfMissing;
242 rocksdb::Status
const status = rocksdb::DB::Open(options, name, &localDb);
243 if (!
status.ok() || (localDb ==
nullptr))
245 Throw<std::runtime_error>(
254 return static_cast<bool>(db);
265 boost::filesystem::path
const dir = name;
266 boost::filesystem::remove_all(dir);
282 XRPL_ASSERT(db,
"xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
287 rocksdb::ReadOptions
const options;
288 rocksdb::Slice
const slice(
reinterpret_cast<char const*
>(hash.data()), keyBytes);
292 rocksdb::Status
const getStatus = db->Get(options, slice, &
string);
296 DecodedBlob decoded(hash.data(),
string.data(),
string.size());
300 *pObject = decoded.createObject();
306 status = Status::DataCorrupt;
311 if (getStatus.IsCorruption())
313 status = Status::DataCorrupt;
315 else if (getStatus.IsNotFound())
317 status = Status::NotFound;
322 static_cast<int>(Status::CustomCode) + unsafeCast<int>(getStatus.code()));
324 JLOG(journal.
error()) << getStatus.ToString();
338 storeBatch(Batch
const& batch)
override
342 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
344 rocksdb::WriteBatch wb;
346 for (
auto const& e : batch)
348 EncodedBlob
const encoded(e);
351 rocksdb::Slice(
reinterpret_cast<char const*
>(encoded.getKey()), keyBytes),
353 reinterpret_cast<char const*
>(encoded.getData()), encoded.getSize()));
356 rocksdb::WriteOptions
const options;
358 auto ret = db->Write(options, &wb);
361 Throw<std::runtime_error>(
"storeBatch failed: " + ret.ToString());
372 XRPL_ASSERT(db,
"xrpl::NodeStore::RocksDBBackend::forEach : non-null database");
373 rocksdb::ReadOptions
const options;
377 for (it->SeekToFirst(); it->Valid(); it->Next())
379 if (it->key().size() == keyBytes)
381 DecodedBlob decoded(it->key().data(), it->value().data(), it->value().size());
385 f(decoded.createObject());
390 JLOG(journal.
fatal()) <<
"Corrupt NodeObject #" << it->key().ToString(
true);
397 JLOG(journal.
fatal()) <<
"Bad key size = " << it->key().size();
403 getWriteLoad()
override
405 return batch.getWriteLoad();
409 setDeletePath()
override
417 writeBatch(Batch
const& batch)
override
424 fdRequired()
const override
426 return fdMinRequired;
432class RocksDBFactory :
public Factory
440 RocksDBFactory(Manager& manager) : manager_(manager)
442 manager_.insert(*
this);
445 [[nodiscard]] std::string
446 getName()
const override
451 std::unique_ptr<Backend>
454 Section
const& keyValues,
456 Scheduler& scheduler,
457 beast::Journal journal)
override
464registerRocksDBFactory(Manager& manager)
466 static RocksDBFactory
const kInstance{manager};
A backend used for the NodeStore.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
bool getIfExists(Section const §ion, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
This callback does the actual writing.