rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1#include <xrpl/basics/rocksdb.h>
2
3#if XRPL_ROCKSDB_AVAILABLE
4#include <xrpl/basics/ByteUtilities.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/nodestore/Factory.h>
9#include <xrpl/nodestore/Manager.h>
10#include <xrpl/nodestore/detail/BatchWriter.h>
11#include <xrpl/nodestore/detail/DecodedBlob.h>
12#include <xrpl/nodestore/detail/EncodedBlob.h>
13
14#include <atomic>
15#include <memory>
16
17namespace ripple {
18namespace NodeStore {
19
20class RocksDBEnv : public rocksdb::EnvWrapper
21{
22public:
23 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
24 {
25 }
26
27 struct ThreadParams
28 {
29 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
30 {
31 }
32
33 void (*f)(void*);
34 void* a;
35 };
36
37 static void
38 thread_entry(void* ptr)
39 {
40 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
41 void (*f)(void*) = p->f;
42 void* a(p->a);
43 delete p;
44
46 std::size_t const id(++n);
48 ss << "rocksdb #" << id;
50
51 (*f)(a);
52 }
53
54 void
55 StartThread(void (*f)(void*), void* a) override
56 {
57 ThreadParams* const p(new ThreadParams(f, a));
58 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
59 }
60};
61
62//------------------------------------------------------------------------------
63
64class RocksDBBackend : public Backend, public BatchWriter::Callback
65{
66private:
67 std::atomic<bool> m_deletePath;
68
69public:
70 beast::Journal m_journal;
71 size_t const m_keyBytes;
72 BatchWriter m_batch;
73 std::string m_name;
75 int fdRequired_ = 2048;
76 rocksdb::Options m_options;
77
78 RocksDBBackend(
79 int keyBytes,
80 Section const& keyValues,
81 Scheduler& scheduler,
82 beast::Journal journal,
83 RocksDBEnv* env)
84 : m_deletePath(false)
85 , m_journal(journal)
86 , m_keyBytes(keyBytes)
87 , m_batch(*this, scheduler)
88 {
89 if (!get_if_exists(keyValues, "path", m_name))
90 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
91
92 rocksdb::BlockBasedTableOptions table_options;
93 m_options.env = env;
94
95 bool hard_set =
96 keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
97
98 if (keyValues.exists("cache_mb"))
99 {
100 auto size = get<int>(keyValues, "cache_mb");
101
102 if (!hard_set && size == 256)
103 size = 1024;
104
105 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
106 }
107
108 if (auto const v = get<int>(keyValues, "filter_bits"))
109 {
110 bool const filter_blocks = !keyValues.exists("filter_full") ||
111 (get<int>(keyValues, "filter_full") == 0);
112 table_options.filter_policy.reset(
113 rocksdb::NewBloomFilterPolicy(v, filter_blocks));
114 }
115
116 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
117 {
118 if (!hard_set && m_options.max_open_files == 2000)
119 m_options.max_open_files = 8000;
120
121 fdRequired_ = m_options.max_open_files + 128;
122 }
123
124 if (keyValues.exists("file_size_mb"))
125 {
126 auto file_size_mb = get<int>(keyValues, "file_size_mb");
127
128 if (!hard_set && file_size_mb == 8)
129 file_size_mb = 256;
130
131 m_options.target_file_size_base = megabytes(file_size_mb);
132 m_options.max_bytes_for_level_base =
133 5 * m_options.target_file_size_base;
134 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
135 }
136
138 keyValues, "file_size_mult", m_options.target_file_size_multiplier);
139
140 if (keyValues.exists("bg_threads"))
141 {
142 m_options.env->SetBackgroundThreads(
143 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
144 }
145
146 if (keyValues.exists("high_threads"))
147 {
148 auto const highThreads = get<int>(keyValues, "high_threads");
149 m_options.env->SetBackgroundThreads(
150 highThreads, rocksdb::Env::HIGH);
151
152 // If we have high-priority threads, presumably we want to
153 // use them for background flushes
154 if (highThreads > 0)
155 m_options.max_background_flushes = highThreads;
156 }
157
158 m_options.compression = rocksdb::kSnappyCompression;
159
160 get_if_exists(keyValues, "block_size", table_options.block_size);
161
162 if (keyValues.exists("universal_compaction") &&
163 (get<int>(keyValues, "universal_compaction") != 0))
164 {
165 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
166 m_options.min_write_buffer_number_to_merge = 2;
167 m_options.max_write_buffer_number = 6;
168 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
169 }
170
171 if (keyValues.exists("bbt_options"))
172 {
173 rocksdb::ConfigOptions config_options;
174 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
175 config_options,
176 table_options,
177 get(keyValues, "bbt_options"),
178 &table_options);
179 if (!s.ok())
180 Throw<std::runtime_error>(
181 std::string("Unable to set RocksDB bbt_options: ") +
182 s.ToString());
183 }
184
185 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
186
187 if (keyValues.exists("options"))
188 {
189 auto const s = rocksdb::GetOptionsFromString(
190 m_options, get(keyValues, "options"), &m_options);
191 if (!s.ok())
192 Throw<std::runtime_error>(
193 std::string("Unable to set RocksDB options: ") +
194 s.ToString());
195 }
196
197 std::string s1, s2;
198 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
199 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
200 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
201 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
202 }
203
204 ~RocksDBBackend() override
205 {
206 close();
207 }
208
209 void
210 open(bool createIfMissing) override
211 {
212 if (m_db)
213 {
214 // LCOV_EXCL_START
215 UNREACHABLE(
216 "ripple::NodeStore::RocksDBBackend::open : database is already "
217 "open");
218 JLOG(m_journal.error()) << "database is already open";
219 return;
220 // LCOV_EXCL_STOP
221 }
222 rocksdb::DB* db = nullptr;
223 m_options.create_if_missing = createIfMissing;
224 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
225 if (!status.ok() || !db)
226 Throw<std::runtime_error>(
227 std::string("Unable to open/create RocksDB: ") +
228 status.ToString());
229 m_db.reset(db);
230 }
231
232 bool
233 isOpen() override
234 {
235 return static_cast<bool>(m_db);
236 }
237
238 void
239 close() override
240 {
241 if (m_db)
242 {
243 m_db.reset();
244 if (m_deletePath)
245 {
246 boost::filesystem::path dir = m_name;
247 boost::filesystem::remove_all(dir);
248 }
249 }
250 }
251
253 getName() override
254 {
255 return m_name;
256 }
257
258 //--------------------------------------------------------------------------
259
260 Status
261 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
262 {
263 XRPL_ASSERT(
264 m_db,
265 "ripple::NodeStore::RocksDBBackend::fetch : non-null database");
266 pObject->reset();
267
268 Status status(ok);
269
270 rocksdb::ReadOptions const options;
271 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
272
273 std::string string;
274
275 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
276
277 if (getStatus.ok())
278 {
279 DecodedBlob decoded(key, string.data(), string.size());
280
281 if (decoded.wasOk())
282 {
283 *pObject = decoded.createObject();
284 }
285 else
286 {
287 // Decoding failed, probably corrupted!
288 //
290 }
291 }
292 else
293 {
294 if (getStatus.IsCorruption())
295 {
297 }
298 else if (getStatus.IsNotFound())
299 {
301 }
302 else
303 {
304 status =
305 Status(customCode + unsafe_cast<int>(getStatus.code()));
306
307 JLOG(m_journal.error()) << getStatus.ToString();
308 }
309 }
310
311 return status;
312 }
313
315 fetchBatch(std::vector<uint256 const*> const& hashes) override
316 {
318 results.reserve(hashes.size());
319 for (auto const& h : hashes)
320 {
322 Status status = fetch(h->begin(), &nObj);
323 if (status != ok)
324 results.push_back({});
325 else
326 results.push_back(nObj);
327 }
328
329 return {results, ok};
330 }
331
332 void
333 store(std::shared_ptr<NodeObject> const& object) override
334 {
335 m_batch.store(object);
336 }
337
338 void
339 storeBatch(Batch const& batch) override
340 {
341 XRPL_ASSERT(
342 m_db,
343 "ripple::NodeStore::RocksDBBackend::storeBatch : non-null "
344 "database");
345 rocksdb::WriteBatch wb;
346
347 for (auto const& e : batch)
348 {
349 EncodedBlob encoded(e);
350
351 wb.Put(
352 rocksdb::Slice(
353 reinterpret_cast<char const*>(encoded.getKey()),
354 m_keyBytes),
355 rocksdb::Slice(
356 reinterpret_cast<char const*>(encoded.getData()),
357 encoded.getSize()));
358 }
359
360 rocksdb::WriteOptions const options;
361
362 auto ret = m_db->Write(options, &wb);
363
364 if (!ret.ok())
365 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
366 }
367
368 void
369 sync() override
370 {
371 }
372
373 void
375 {
376 XRPL_ASSERT(
377 m_db,
378 "ripple::NodeStore::RocksDBBackend::for_each : non-null database");
379 rocksdb::ReadOptions const options;
380
381 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
382
383 for (it->SeekToFirst(); it->Valid(); it->Next())
384 {
385 if (it->key().size() == m_keyBytes)
386 {
387 DecodedBlob decoded(
388 it->key().data(), it->value().data(), it->value().size());
389
390 if (decoded.wasOk())
391 {
392 f(decoded.createObject());
393 }
394 else
395 {
396 // Uh oh, corrupted data!
397 JLOG(m_journal.fatal())
398 << "Corrupt NodeObject #" << it->key().ToString(true);
399 }
400 }
401 else
402 {
403 // VFALCO NOTE What does it mean to find an
404 // incorrectly sized key? Corruption?
405 JLOG(m_journal.fatal())
406 << "Bad key size = " << it->key().size();
407 }
408 }
409 }
410
411 int
412 getWriteLoad() override
413 {
414 return m_batch.getWriteLoad();
415 }
416
417 void
418 setDeletePath() override
419 {
420 m_deletePath = true;
421 }
422
423 //--------------------------------------------------------------------------
424
425 void
426 writeBatch(Batch const& batch) override
427 {
428 storeBatch(batch);
429 }
430
432 int
433 fdRequired() const override
434 {
435 return fdRequired_;
436 }
437};
438
439//------------------------------------------------------------------------------
440
441class RocksDBFactory : public Factory
442{
443private:
444 Manager& manager_;
445
446public:
447 RocksDBEnv m_env;
448
449 RocksDBFactory(Manager& manager) : manager_(manager)
450 {
451 manager_.insert(*this);
452 }
453
455 getName() const override
456 {
457 return "RocksDB";
458 }
459
461 createInstance(
462 size_t keyBytes,
463 Section const& keyValues,
465 Scheduler& scheduler,
466 beast::Journal journal) override
467 {
469 keyBytes, keyValues, scheduler, journal, &m_env);
470 }
471};
472
473void
474registerRocksDBFactory(Manager& manager)
475{
476 static RocksDBFactory instance{manager};
477}
478
479} // namespace NodeStore
480} // namespace ripple
481
482#endif
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void registerRocksDBFactory(Manager &manager)
Status
Return codes from Backend operations.
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:82
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)