rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1#include <xrpl/basics/rocksdb.h>
2
3#if XRPL_ROCKSDB_AVAILABLE
4#include <xrpl/basics/ByteUtilities.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/nodestore/Factory.h>
9#include <xrpl/nodestore/Manager.h>
10#include <xrpl/nodestore/detail/BatchWriter.h>
11#include <xrpl/nodestore/detail/DecodedBlob.h>
12#include <xrpl/nodestore/detail/EncodedBlob.h>
13
14#include <atomic>
15#include <memory>
16
17namespace xrpl {
18namespace NodeStore {
19
20class RocksDBEnv : public rocksdb::EnvWrapper
21{
22public:
23 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
24 {
25 }
26
27 struct ThreadParams
28 {
29 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
30 {
31 }
32
33 void (*f)(void*);
34 void* a;
35 };
36
37 static void
38 thread_entry(void* ptr)
39 {
40 ThreadParams* const p(reinterpret_cast<ThreadParams*>(ptr));
41 void (*f)(void*) = p->f;
42 void* a(p->a);
43 delete p;
44
46 std::size_t const id(++n);
48 ss << "rocksdb #" << id;
50
51 (*f)(a);
52 }
53
54 void
55 StartThread(void (*f)(void*), void* a) override
56 {
57 ThreadParams* const p(new ThreadParams(f, a));
58 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
59 }
60};
61
62//------------------------------------------------------------------------------
63
64class RocksDBBackend : public Backend, public BatchWriter::Callback
65{
66private:
67 std::atomic<bool> m_deletePath;
68
69public:
70 beast::Journal m_journal;
71 size_t const m_keyBytes;
72 BatchWriter m_batch;
73 std::string m_name;
75 int fdRequired_ = 2048;
76 rocksdb::Options m_options;
77
78 RocksDBBackend(
79 int keyBytes,
80 Section const& keyValues,
81 Scheduler& scheduler,
82 beast::Journal journal,
83 RocksDBEnv* env)
84 : m_deletePath(false)
85 , m_journal(journal)
86 , m_keyBytes(keyBytes)
87 , m_batch(*this, scheduler)
88 {
89 if (!get_if_exists(keyValues, "path", m_name))
90 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
91
92 rocksdb::BlockBasedTableOptions table_options;
93 m_options.env = env;
94
95 bool hard_set =
96 keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
97
98 if (keyValues.exists("cache_mb"))
99 {
100 auto size = get<int>(keyValues, "cache_mb");
101
102 if (!hard_set && size == 256)
103 size = 1024;
104
105 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
106 }
107
108 if (auto const v = get<int>(keyValues, "filter_bits"))
109 {
110 bool const filter_blocks = !keyValues.exists("filter_full") ||
111 (get<int>(keyValues, "filter_full") == 0);
112 table_options.filter_policy.reset(
113 rocksdb::NewBloomFilterPolicy(v, filter_blocks));
114 }
115
116 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
117 {
118 if (!hard_set && m_options.max_open_files == 2000)
119 m_options.max_open_files = 8000;
120
121 fdRequired_ = m_options.max_open_files + 128;
122 }
123
124 if (keyValues.exists("file_size_mb"))
125 {
126 auto file_size_mb = get<int>(keyValues, "file_size_mb");
127
128 if (!hard_set && file_size_mb == 8)
129 file_size_mb = 256;
130
131 m_options.target_file_size_base = megabytes(file_size_mb);
132 m_options.max_bytes_for_level_base =
133 5 * m_options.target_file_size_base;
134 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
135 }
136
138 keyValues, "file_size_mult", m_options.target_file_size_multiplier);
139
140 if (keyValues.exists("bg_threads"))
141 {
142 m_options.env->SetBackgroundThreads(
143 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
144 }
145
146 if (keyValues.exists("high_threads"))
147 {
148 auto const highThreads = get<int>(keyValues, "high_threads");
149 m_options.env->SetBackgroundThreads(
150 highThreads, rocksdb::Env::HIGH);
151
152 // If we have high-priority threads, presumably we want to
153 // use them for background flushes
154 if (highThreads > 0)
155 m_options.max_background_flushes = highThreads;
156 }
157
158 m_options.compression = rocksdb::kSnappyCompression;
159
160 get_if_exists(keyValues, "block_size", table_options.block_size);
161
162 if (keyValues.exists("universal_compaction") &&
163 (get<int>(keyValues, "universal_compaction") != 0))
164 {
165 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
166 m_options.min_write_buffer_number_to_merge = 2;
167 m_options.max_write_buffer_number = 6;
168 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
169 }
170
171 if (keyValues.exists("bbt_options"))
172 {
173 rocksdb::ConfigOptions config_options;
174 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
175 config_options,
176 table_options,
177 get(keyValues, "bbt_options"),
178 &table_options);
179 if (!s.ok())
180 Throw<std::runtime_error>(
181 std::string("Unable to set RocksDB bbt_options: ") +
182 s.ToString());
183 }
184
185 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
186
187 if (keyValues.exists("options"))
188 {
189 auto const s = rocksdb::GetOptionsFromString(
190 m_options, get(keyValues, "options"), &m_options);
191 if (!s.ok())
192 Throw<std::runtime_error>(
193 std::string("Unable to set RocksDB options: ") +
194 s.ToString());
195 }
196
197 std::string s1, s2;
198 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
199 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
200 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
201 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
202 }
203
204 ~RocksDBBackend() override
205 {
206 close();
207 }
208
209 void
210 open(bool createIfMissing) override
211 {
212 if (m_db)
213 {
214 // LCOV_EXCL_START
215 UNREACHABLE(
216 "xrpl::NodeStore::RocksDBBackend::open : database is already "
217 "open");
218 JLOG(m_journal.error()) << "database is already open";
219 return;
220 // LCOV_EXCL_STOP
221 }
222 rocksdb::DB* db = nullptr;
223 m_options.create_if_missing = createIfMissing;
224 rocksdb::Status status = rocksdb::DB::Open(m_options, m_name, &db);
225 if (!status.ok() || !db)
226 Throw<std::runtime_error>(
227 std::string("Unable to open/create RocksDB: ") +
228 status.ToString());
229 m_db.reset(db);
230 }
231
232 bool
233 isOpen() override
234 {
235 return static_cast<bool>(m_db);
236 }
237
238 void
239 close() override
240 {
241 if (m_db)
242 {
243 m_db.reset();
244 if (m_deletePath)
245 {
246 boost::filesystem::path dir = m_name;
247 boost::filesystem::remove_all(dir);
248 }
249 }
250 }
251
253 getName() override
254 {
255 return m_name;
256 }
257
258 //--------------------------------------------------------------------------
259
260 Status
261 fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
262 {
263 XRPL_ASSERT(
264 m_db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
265 pObject->reset();
266
267 Status status(ok);
268
269 rocksdb::ReadOptions const options;
270 rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
271
272 std::string string;
273
274 rocksdb::Status getStatus = m_db->Get(options, slice, &string);
275
276 if (getStatus.ok())
277 {
278 DecodedBlob decoded(key, string.data(), string.size());
279
280 if (decoded.wasOk())
281 {
282 *pObject = decoded.createObject();
283 }
284 else
285 {
286 // Decoding failed, probably corrupted!
287 //
289 }
290 }
291 else
292 {
293 if (getStatus.IsCorruption())
294 {
296 }
297 else if (getStatus.IsNotFound())
298 {
300 }
301 else
302 {
303 status =
304 Status(customCode + unsafe_cast<int>(getStatus.code()));
305
306 JLOG(m_journal.error()) << getStatus.ToString();
307 }
308 }
309
310 return status;
311 }
312
314 fetchBatch(std::vector<uint256 const*> const& hashes) override
315 {
317 results.reserve(hashes.size());
318 for (auto const& h : hashes)
319 {
321 Status status = fetch(h->begin(), &nObj);
322 if (status != ok)
323 results.push_back({});
324 else
325 results.push_back(nObj);
326 }
327
328 return {results, ok};
329 }
330
331 void
332 store(std::shared_ptr<NodeObject> const& object) override
333 {
334 m_batch.store(object);
335 }
336
337 void
338 storeBatch(Batch const& batch) override
339 {
340 XRPL_ASSERT(
341 m_db,
342 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
343 "database");
344 rocksdb::WriteBatch wb;
345
346 for (auto const& e : batch)
347 {
348 EncodedBlob encoded(e);
349
350 wb.Put(
351 rocksdb::Slice(
352 reinterpret_cast<char const*>(encoded.getKey()),
353 m_keyBytes),
354 rocksdb::Slice(
355 reinterpret_cast<char const*>(encoded.getData()),
356 encoded.getSize()));
357 }
358
359 rocksdb::WriteOptions const options;
360
361 auto ret = m_db->Write(options, &wb);
362
363 if (!ret.ok())
364 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
365 }
366
367 void
368 sync() override
369 {
370 }
371
372 void
374 {
375 XRPL_ASSERT(
376 m_db,
377 "xrpl::NodeStore::RocksDBBackend::for_each : non-null database");
378 rocksdb::ReadOptions const options;
379
380 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
381
382 for (it->SeekToFirst(); it->Valid(); it->Next())
383 {
384 if (it->key().size() == m_keyBytes)
385 {
386 DecodedBlob decoded(
387 it->key().data(), it->value().data(), it->value().size());
388
389 if (decoded.wasOk())
390 {
391 f(decoded.createObject());
392 }
393 else
394 {
395 // Uh oh, corrupted data!
396 JLOG(m_journal.fatal())
397 << "Corrupt NodeObject #" << it->key().ToString(true);
398 }
399 }
400 else
401 {
402 // VFALCO NOTE What does it mean to find an
403 // incorrectly sized key? Corruption?
404 JLOG(m_journal.fatal())
405 << "Bad key size = " << it->key().size();
406 }
407 }
408 }
409
410 int
411 getWriteLoad() override
412 {
413 return m_batch.getWriteLoad();
414 }
415
416 void
417 setDeletePath() override
418 {
419 m_deletePath = true;
420 }
421
422 //--------------------------------------------------------------------------
423
424 void
425 writeBatch(Batch const& batch) override
426 {
427 storeBatch(batch);
428 }
429
431 int
432 fdRequired() const override
433 {
434 return fdRequired_;
435 }
436};
437
438//------------------------------------------------------------------------------
439
440class RocksDBFactory : public Factory
441{
442private:
443 Manager& manager_;
444
445public:
446 RocksDBEnv m_env;
447
448 RocksDBFactory(Manager& manager) : manager_(manager)
449 {
450 manager_.insert(*this);
451 }
452
454 getName() const override
455 {
456 return "RocksDB";
457 }
458
460 createInstance(
461 size_t keyBytes,
462 Section const& keyValues,
464 Scheduler& scheduler,
465 beast::Journal journal) override
466 {
468 keyBytes, keyValues, scheduler, journal, &m_env);
469 }
470};
471
472void
473registerRocksDBFactory(Manager& manager)
474{
475 static RocksDBFactory instance{manager};
476}
477
478} // namespace NodeStore
479} // namespace xrpl
480
481#endif
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:333
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
void registerRocksDBFactory(Manager &manager)
auto const data
General field definitions, or fields used in multiple transaction namespaces.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:82
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)