1#include <xrpl/basics/rocksdb.h>
3#if XRPL_ROCKSDB_AVAILABLE
4#include <xrpl/basics/ByteUtilities.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/nodestore/Factory.h>
9#include <xrpl/nodestore/Manager.h>
10#include <xrpl/nodestore/detail/BatchWriter.h>
11#include <xrpl/nodestore/detail/DecodedBlob.h>
12#include <xrpl/nodestore/detail/EncodedBlob.h>
20class RocksDBEnv :
public rocksdb::EnvWrapper
23 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
29 ThreadParams(
void (*f_)(
void*),
void* a_) : f(f_), a(a_)
38 thread_entry(
void* ptr)
40 ThreadParams
const*
const p(
reinterpret_cast<ThreadParams*
>(ptr));
54 StartThread(
void (*f)(
void*),
void* a)
override
56 ThreadParams*
const p(
new ThreadParams(f, a));
57 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
63class RocksDBBackend :
public Backend,
public BatchWriter::Callback
70 size_t const m_keyBytes;
74 int fdRequired_ = 2048;
75 rocksdb::Options m_options;
79 Section
const& keyValues,
83 : m_deletePath(false), m_journal(journal), m_keyBytes(keyBytes), m_batch(*this, scheduler)
86 Throw<std::runtime_error>(
"Missing path in RocksDBFactory backend");
88 rocksdb::BlockBasedTableOptions table_options;
91 bool const hard_set = keyValues.exists(
"hard_set") && get<bool>(keyValues,
"hard_set");
93 if (keyValues.exists(
"cache_mb"))
95 auto size = get<int>(keyValues,
"cache_mb");
97 if (!hard_set && size == 256)
100 table_options.block_cache = rocksdb::NewLRUCache(
megabytes(size));
103 if (
auto const v = get<int>(keyValues,
"filter_bits"))
105 bool const filter_blocks =
106 !keyValues.exists(
"filter_full") || (get<int>(keyValues,
"filter_full") == 0);
107 table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(v, filter_blocks));
110 if (
get_if_exists(keyValues,
"open_files", m_options.max_open_files))
112 if (!hard_set && m_options.max_open_files == 2000)
113 m_options.max_open_files = 8000;
115 fdRequired_ = m_options.max_open_files + 128;
118 if (keyValues.exists(
"file_size_mb"))
120 auto file_size_mb = get<int>(keyValues,
"file_size_mb");
122 if (!hard_set && file_size_mb == 8)
125 m_options.target_file_size_base =
megabytes(file_size_mb);
126 m_options.max_bytes_for_level_base = 5 * m_options.target_file_size_base;
127 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
130 get_if_exists(keyValues,
"file_size_mult", m_options.target_file_size_multiplier);
132 if (keyValues.exists(
"bg_threads"))
134 m_options.env->SetBackgroundThreads(
135 get<int>(keyValues,
"bg_threads"), rocksdb::Env::LOW);
138 if (keyValues.exists(
"high_threads"))
140 auto const highThreads = get<int>(keyValues,
"high_threads");
141 m_options.env->SetBackgroundThreads(highThreads, rocksdb::Env::HIGH);
146 m_options.max_background_flushes = highThreads;
149 m_options.compression = rocksdb::kSnappyCompression;
151 get_if_exists(keyValues,
"block_size", table_options.block_size);
153 if (keyValues.exists(
"universal_compaction") &&
154 (get<int>(keyValues,
"universal_compaction") != 0))
156 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
157 m_options.min_write_buffer_number_to_merge = 2;
158 m_options.max_write_buffer_number = 6;
159 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
162 if (keyValues.exists(
"bbt_options"))
164 rocksdb::ConfigOptions
const config_options;
165 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
166 config_options, table_options,
get(keyValues,
"bbt_options"), &table_options);
169 Throw<std::runtime_error>(
170 std::string(
"Unable to set RocksDB bbt_options: ") + s.ToString());
174 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
176 if (keyValues.exists(
"options"))
179 rocksdb::GetOptionsFromString(m_options,
get(keyValues,
"options"), &m_options);
182 Throw<std::runtime_error>(
183 std::string(
"Unable to set RocksDB options: ") + s.ToString());
188 rocksdb::GetStringFromDBOptions(&s1, m_options,
"; ");
189 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options,
"; ");
190 JLOG(m_journal.
debug()) <<
"RocksDB DBOptions: " << s1;
191 JLOG(m_journal.
debug()) <<
"RocksDB CFOptions: " << s2;
194 ~RocksDBBackend()
override
200 open(
bool createIfMissing)
override
206 "xrpl::NodeStore::RocksDBBackend::open : database is already "
208 JLOG(m_journal.
error()) <<
"database is already open";
212 rocksdb::DB* db =
nullptr;
213 m_options.create_if_missing = createIfMissing;
214 rocksdb::Status
const status = rocksdb::DB::Open(m_options, m_name, &db);
215 if (!
status.ok() || (db ==
nullptr))
217 Throw<std::runtime_error>(
226 return static_cast<bool>(m_db);
237 boost::filesystem::path
const dir = m_name;
238 boost::filesystem::remove_all(dir);
254 XRPL_ASSERT(m_db,
"xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
259 rocksdb::ReadOptions
const options;
264 rocksdb::Status
const getStatus = m_db->Get(options, slice, &
string);
268 DecodedBlob decoded(hash.data(),
string.data(),
string.size());
272 *pObject = decoded.createObject();
283 if (getStatus.IsCorruption())
287 else if (getStatus.IsNotFound())
293 status =
Status(customCode + unsafe_cast<int>(getStatus.code()));
295 JLOG(m_journal.
error()) << getStatus.ToString();
307 for (
auto const& h : hashes)
321 return {results,
ok};
327 m_batch.store(
object);
331 storeBatch(Batch
const& batch)
override
335 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
337 rocksdb::WriteBatch wb;
339 for (
auto const& e :
batch)
341 EncodedBlob
const encoded(e);
348 rocksdb::WriteOptions
const options;
350 auto ret = m_db->Write(options, &wb);
353 Throw<std::runtime_error>(
"storeBatch failed: " + ret.ToString());
364 XRPL_ASSERT(m_db,
"xrpl::NodeStore::RocksDBBackend::for_each : non-null database");
365 rocksdb::ReadOptions
const options;
369 for (it->SeekToFirst(); it->Valid(); it->Next())
371 if (it->key().size() == m_keyBytes)
373 DecodedBlob decoded(it->key().data(), it->value().data(), it->value().size());
377 f(decoded.createObject());
382 JLOG(m_journal.
fatal()) <<
"Corrupt NodeObject #" << it->key().ToString(
true);
389 JLOG(m_journal.
fatal()) <<
"Bad key size = " << it->key().size();
395 getWriteLoad()
override
397 return m_batch.getWriteLoad();
401 setDeletePath()
override
409 writeBatch(Batch
const& batch)
override
416 fdRequired()
const override
424class RocksDBFactory :
public Factory
432 RocksDBFactory(Manager& manager) : manager_(manager)
434 manager_.insert(*
this);
438 getName()
const override
446 Section
const& keyValues,
448 Scheduler& scheduler,
458 static RocksDBFactory
const instance{manager};
A generic endpoint for log messages.
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
void registerRocksDBFactory(Manager &manager)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const §ion, std::string const &name, T &v)