1#include <xrpl/basics/rocksdb.h> 
    3#if XRPL_ROCKSDB_AVAILABLE 
    4#include <xrpl/basics/ByteUtilities.h> 
    5#include <xrpl/basics/contract.h> 
    6#include <xrpl/basics/safe_cast.h> 
    7#include <xrpl/beast/core/CurrentThreadName.h> 
    8#include <xrpl/nodestore/Factory.h> 
    9#include <xrpl/nodestore/Manager.h> 
   10#include <xrpl/nodestore/detail/BatchWriter.h> 
   11#include <xrpl/nodestore/detail/DecodedBlob.h> 
   12#include <xrpl/nodestore/detail/EncodedBlob.h> 
   20class RocksDBEnv : 
public rocksdb::EnvWrapper
 
   23    RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
 
   29        ThreadParams(
void (*f_)(
void*), 
void* a_) : f(f_), a(a_)
 
   38    thread_entry(
void* ptr)
 
   40        ThreadParams* 
const p(
reinterpret_cast<ThreadParams*
>(ptr));
 
   41        void (*f)(
void*) = p->f;
 
   48        ss << 
"rocksdb #" << id;
 
   55    StartThread(
void (*f)(
void*), 
void* a)
 override 
   57        ThreadParams* 
const p(
new ThreadParams(f, a));
 
   58        EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
 
   64class RocksDBBackend : 
public Backend, 
public BatchWriter::Callback
 
   71    size_t const m_keyBytes;
 
   75    int fdRequired_ = 2048;
 
   76    rocksdb::Options m_options;
 
   80        Section 
const& keyValues,
 
   86        , m_keyBytes(keyBytes)
 
   87        , m_batch(*this, scheduler)
 
   90            Throw<std::runtime_error>(
"Missing path in RocksDBFactory backend");
 
   92        rocksdb::BlockBasedTableOptions table_options;
 
   96            keyValues.exists(
"hard_set") && get<bool>(keyValues, 
"hard_set");
 
   98        if (keyValues.exists(
"cache_mb"))
 
  100            auto size = get<int>(keyValues, 
"cache_mb");
 
  102            if (!hard_set && size == 256)
 
  105            table_options.block_cache = rocksdb::NewLRUCache(
megabytes(size));
 
  108        if (
auto const v = get<int>(keyValues, 
"filter_bits"))
 
  110            bool const filter_blocks = !keyValues.exists(
"filter_full") ||
 
  111                (get<int>(keyValues, 
"filter_full") == 0);
 
  112            table_options.filter_policy.reset(
 
  113                rocksdb::NewBloomFilterPolicy(v, filter_blocks));
 
  116        if (
get_if_exists(keyValues, 
"open_files", m_options.max_open_files))
 
  118            if (!hard_set && m_options.max_open_files == 2000)
 
  119                m_options.max_open_files = 8000;
 
  121            fdRequired_ = m_options.max_open_files + 128;
 
  124        if (keyValues.exists(
"file_size_mb"))
 
  126            auto file_size_mb = get<int>(keyValues, 
"file_size_mb");
 
  128            if (!hard_set && file_size_mb == 8)
 
  131            m_options.target_file_size_base = 
megabytes(file_size_mb);
 
  132            m_options.max_bytes_for_level_base =
 
  133                5 * m_options.target_file_size_base;
 
  134            m_options.write_buffer_size = 2 * m_options.target_file_size_base;
 
  138            keyValues, 
"file_size_mult", m_options.target_file_size_multiplier);
 
  140        if (keyValues.exists(
"bg_threads"))
 
  142            m_options.env->SetBackgroundThreads(
 
  143                get<int>(keyValues, 
"bg_threads"), rocksdb::Env::LOW);
 
  146        if (keyValues.exists(
"high_threads"))
 
  148            auto const highThreads = get<int>(keyValues, 
"high_threads");
 
  149            m_options.env->SetBackgroundThreads(
 
  150                highThreads, rocksdb::Env::HIGH);
 
  155                m_options.max_background_flushes = highThreads;
 
  158        m_options.compression = rocksdb::kSnappyCompression;
 
  160        get_if_exists(keyValues, 
"block_size", table_options.block_size);
 
  162        if (keyValues.exists(
"universal_compaction") &&
 
  163            (get<int>(keyValues, 
"universal_compaction") != 0))
 
  165            m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
 
  166            m_options.min_write_buffer_number_to_merge = 2;
 
  167            m_options.max_write_buffer_number = 6;
 
  168            m_options.write_buffer_size = 6 * m_options.target_file_size_base;
 
  171        if (keyValues.exists(
"bbt_options"))
 
  173            rocksdb::ConfigOptions config_options;
 
  174            auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
 
  177                get(keyValues, 
"bbt_options"),
 
  180                Throw<std::runtime_error>(
 
  181                    std::string(
"Unable to set RocksDB bbt_options: ") +
 
  185        m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
 
  187        if (keyValues.exists(
"options"))
 
  189            auto const s = rocksdb::GetOptionsFromString(
 
  190                m_options, 
get(keyValues, 
"options"), &m_options);
 
  192                Throw<std::runtime_error>(
 
  198        rocksdb::GetStringFromDBOptions(&s1, m_options, 
"; ");
 
  199        rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, 
"; ");
 
  200        JLOG(m_journal.
debug()) << 
"RocksDB DBOptions: " << s1;
 
  201        JLOG(m_journal.
debug()) << 
"RocksDB CFOptions: " << s2;
 
  204    ~RocksDBBackend()
 override 
  210    open(
bool createIfMissing)
 override 
  216                "ripple::NodeStore::RocksDBBackend::open : database is already " 
  218            JLOG(m_journal.
error()) << 
"database is already open";
 
  222        rocksdb::DB* db = 
nullptr;
 
  223        m_options.create_if_missing = createIfMissing;
 
  224        rocksdb::Status 
status = rocksdb::DB::Open(m_options, m_name, &db);
 
  226            Throw<std::runtime_error>(
 
  235        return static_cast<bool>(m_db);
 
  246                boost::filesystem::path dir = m_name;
 
  247                boost::filesystem::remove_all(dir);
 
  265            "ripple::NodeStore::RocksDBBackend::fetch : non-null database");
 
  270        rocksdb::ReadOptions 
const options;
 
  271        rocksdb::Slice 
const slice(
static_cast<char const*
>(key), m_keyBytes);
 
  275        rocksdb::Status getStatus = m_db->Get(options, slice, &
string);
 
  279            DecodedBlob decoded(key, 
string.
data(), 
string.
size());
 
  283                *pObject = decoded.createObject();
 
  294            if (getStatus.IsCorruption())
 
  298            else if (getStatus.IsNotFound())
 
  305                    Status(customCode + unsafe_cast<int>(getStatus.code()));
 
  307                JLOG(m_journal.
error()) << getStatus.ToString();
 
  319        for (
auto const& h : hashes)
 
  329        return {results, 
ok};
 
  335        m_batch.store(
object);
 
  339    storeBatch(Batch 
const& batch)
 override 
  343            "ripple::NodeStore::RocksDBBackend::storeBatch : non-null " 
  345        rocksdb::WriteBatch wb;
 
  347        for (
auto const& e : 
batch)
 
  349            EncodedBlob encoded(e);
 
  353                    reinterpret_cast<char const*
>(encoded.getKey()),
 
  356                    reinterpret_cast<char const*
>(encoded.getData()),
 
  360        rocksdb::WriteOptions 
const options;
 
  362        auto ret = m_db->Write(options, &wb);
 
  365            Throw<std::runtime_error>(
"storeBatch failed: " + ret.ToString());
 
  378            "ripple::NodeStore::RocksDBBackend::for_each : non-null database");
 
  379        rocksdb::ReadOptions 
const options;
 
  383        for (it->SeekToFirst(); it->Valid(); it->Next())
 
  385            if (it->key().size() == m_keyBytes)
 
  388                    it->key().data(), it->value().data(), it->value().size());
 
  392                    f(decoded.createObject());
 
  397                    JLOG(m_journal.
fatal())
 
  398                        << 
"Corrupt NodeObject #" << it->key().ToString(
true);
 
  405                JLOG(m_journal.
fatal())
 
  406                    << 
"Bad key size = " << it->key().size();
 
  412    getWriteLoad()
 override 
  414        return m_batch.getWriteLoad();
 
  418    setDeletePath()
 override 
  426    writeBatch(Batch 
const& batch)
 override 
  433    fdRequired()
 const override 
  441class RocksDBFactory : 
public Factory
 
  449    RocksDBFactory(Manager& manager) : manager_(manager)
 
  451        manager_.insert(*
this);
 
  455    getName()
 const override 
  463        Section 
const& keyValues,
 
  465        Scheduler& scheduler,
 
  469            keyBytes, keyValues, scheduler, journal, &m_env);
 
  476    static RocksDBFactory instance{manager};
 
A generic endpoint for log messages.
 
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
 
void registerRocksDBFactory(Manager &manager)
 
Status
Return codes from Backend operations.
 
auto const data
General field definitions, or fields used in multiple transaction namespaces.
 
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
 
constexpr auto megabytes(T value) noexcept
 
bool get_if_exists(Section const §ion, std::string const &name, T &v)
 
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
 
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.