rippled
Loading...
Searching...
No Matches
RocksDBFactory.cpp
1#include <xrpl/basics/rocksdb.h>
2
3#if XRPL_ROCKSDB_AVAILABLE
4#include <xrpl/basics/ByteUtilities.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/basics/safe_cast.h>
7#include <xrpl/beast/core/CurrentThreadName.h>
8#include <xrpl/nodestore/Factory.h>
9#include <xrpl/nodestore/Manager.h>
10#include <xrpl/nodestore/detail/BatchWriter.h>
11#include <xrpl/nodestore/detail/DecodedBlob.h>
12#include <xrpl/nodestore/detail/EncodedBlob.h>
13
14#include <atomic>
15#include <memory>
16
17namespace xrpl {
18namespace NodeStore {
19
20class RocksDBEnv : public rocksdb::EnvWrapper
21{
22public:
23 RocksDBEnv() : EnvWrapper(rocksdb::Env::Default())
24 {
25 }
26
27 struct ThreadParams
28 {
29 ThreadParams(void (*f_)(void*), void* a_) : f(f_), a(a_)
30 {
31 }
32
33 void (*f)(void*);
34 void* a;
35 };
36
37 static void
38 thread_entry(void* ptr)
39 {
40 ThreadParams const* const p(reinterpret_cast<ThreadParams*>(ptr));
41 auto const f = p->f;
42
43 void* a(p->a);
44 delete p;
45
47 std::size_t const id(++n);
49
50 f(a);
51 }
52
53 void
54 StartThread(void (*f)(void*), void* a) override
55 {
56 ThreadParams* const p(new ThreadParams(f, a));
57 EnvWrapper::StartThread(&RocksDBEnv::thread_entry, p);
58 }
59};
60
61//------------------------------------------------------------------------------
62
63class RocksDBBackend : public Backend, public BatchWriter::Callback
64{
65private:
66 std::atomic<bool> m_deletePath;
67
68public:
69 beast::Journal m_journal;
70 size_t const m_keyBytes;
71 BatchWriter m_batch;
72 std::string m_name;
74 int fdRequired_ = 2048;
75 rocksdb::Options m_options;
76
77 RocksDBBackend(
78 int keyBytes,
79 Section const& keyValues,
80 Scheduler& scheduler,
81 beast::Journal journal,
82 RocksDBEnv* env)
83 : m_deletePath(false), m_journal(journal), m_keyBytes(keyBytes), m_batch(*this, scheduler)
84 {
85 if (!get_if_exists(keyValues, "path", m_name))
86 Throw<std::runtime_error>("Missing path in RocksDBFactory backend");
87
88 rocksdb::BlockBasedTableOptions table_options;
89 m_options.env = env;
90
91 bool const hard_set = keyValues.exists("hard_set") && get<bool>(keyValues, "hard_set");
92
93 if (keyValues.exists("cache_mb"))
94 {
95 auto size = get<int>(keyValues, "cache_mb");
96
97 if (!hard_set && size == 256)
98 size = 1024;
99
100 table_options.block_cache = rocksdb::NewLRUCache(megabytes(size));
101 }
102
103 if (auto const v = get<int>(keyValues, "filter_bits"))
104 {
105 bool const filter_blocks =
106 !keyValues.exists("filter_full") || (get<int>(keyValues, "filter_full") == 0);
107 table_options.filter_policy.reset(rocksdb::NewBloomFilterPolicy(v, filter_blocks));
108 }
109
110 if (get_if_exists(keyValues, "open_files", m_options.max_open_files))
111 {
112 if (!hard_set && m_options.max_open_files == 2000)
113 m_options.max_open_files = 8000;
114
115 fdRequired_ = m_options.max_open_files + 128;
116 }
117
118 if (keyValues.exists("file_size_mb"))
119 {
120 auto file_size_mb = get<int>(keyValues, "file_size_mb");
121
122 if (!hard_set && file_size_mb == 8)
123 file_size_mb = 256;
124
125 m_options.target_file_size_base = megabytes(file_size_mb);
126 m_options.max_bytes_for_level_base = 5 * m_options.target_file_size_base;
127 m_options.write_buffer_size = 2 * m_options.target_file_size_base;
128 }
129
130 get_if_exists(keyValues, "file_size_mult", m_options.target_file_size_multiplier);
131
132 if (keyValues.exists("bg_threads"))
133 {
134 m_options.env->SetBackgroundThreads(
135 get<int>(keyValues, "bg_threads"), rocksdb::Env::LOW);
136 }
137
138 if (keyValues.exists("high_threads"))
139 {
140 auto const highThreads = get<int>(keyValues, "high_threads");
141 m_options.env->SetBackgroundThreads(highThreads, rocksdb::Env::HIGH);
142
143 // If we have high-priority threads, presumably we want to
144 // use them for background flushes
145 if (highThreads > 0)
146 m_options.max_background_flushes = highThreads;
147 }
148
149 m_options.compression = rocksdb::kSnappyCompression;
150
151 get_if_exists(keyValues, "block_size", table_options.block_size);
152
153 if (keyValues.exists("universal_compaction") &&
154 (get<int>(keyValues, "universal_compaction") != 0))
155 {
156 m_options.compaction_style = rocksdb::kCompactionStyleUniversal;
157 m_options.min_write_buffer_number_to_merge = 2;
158 m_options.max_write_buffer_number = 6;
159 m_options.write_buffer_size = 6 * m_options.target_file_size_base;
160 }
161
162 if (keyValues.exists("bbt_options"))
163 {
164 rocksdb::ConfigOptions const config_options;
165 auto const s = rocksdb::GetBlockBasedTableOptionsFromString(
166 config_options, table_options, get(keyValues, "bbt_options"), &table_options);
167 if (!s.ok())
168 {
169 Throw<std::runtime_error>(
170 std::string("Unable to set RocksDB bbt_options: ") + s.ToString());
171 }
172 }
173
174 m_options.table_factory.reset(NewBlockBasedTableFactory(table_options));
175
176 if (keyValues.exists("options"))
177 {
178 auto const s =
179 rocksdb::GetOptionsFromString(m_options, get(keyValues, "options"), &m_options);
180 if (!s.ok())
181 {
182 Throw<std::runtime_error>(
183 std::string("Unable to set RocksDB options: ") + s.ToString());
184 }
185 }
186
187 std::string s1, s2;
188 rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
189 rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
190 JLOG(m_journal.debug()) << "RocksDB DBOptions: " << s1;
191 JLOG(m_journal.debug()) << "RocksDB CFOptions: " << s2;
192 }
193
194 ~RocksDBBackend() override
195 {
196 close();
197 }
198
199 void
200 open(bool createIfMissing) override
201 {
202 if (m_db)
203 {
204 // LCOV_EXCL_START
205 UNREACHABLE(
206 "xrpl::NodeStore::RocksDBBackend::open : database is already "
207 "open");
208 JLOG(m_journal.error()) << "database is already open";
209 return;
210 // LCOV_EXCL_STOP
211 }
212 rocksdb::DB* db = nullptr;
213 m_options.create_if_missing = createIfMissing;
214 rocksdb::Status const status = rocksdb::DB::Open(m_options, m_name, &db);
215 if (!status.ok() || (db == nullptr))
216 {
217 Throw<std::runtime_error>(
218 std::string("Unable to open/create RocksDB: ") + status.ToString());
219 }
220 m_db.reset(db);
221 }
222
223 bool
224 isOpen() override
225 {
226 return static_cast<bool>(m_db);
227 }
228
229 void
230 close() override
231 {
232 if (m_db)
233 {
234 m_db.reset();
235 if (m_deletePath)
236 {
237 boost::filesystem::path const dir = m_name;
238 boost::filesystem::remove_all(dir);
239 }
240 }
241 }
242
244 getName() override
245 {
246 return m_name;
247 }
248
249 //--------------------------------------------------------------------------
250
251 Status
252 fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
253 {
254 XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
255 pObject->reset();
256
257 Status status(ok);
258
259 rocksdb::ReadOptions const options;
260 rocksdb::Slice const slice(std::bit_cast<char const*>(hash.data()), m_keyBytes);
261
262 std::string string;
263
264 rocksdb::Status const getStatus = m_db->Get(options, slice, &string);
265
266 if (getStatus.ok())
267 {
268 DecodedBlob decoded(hash.data(), string.data(), string.size());
269
270 if (decoded.wasOk())
271 {
272 *pObject = decoded.createObject();
273 }
274 else
275 {
276 // Decoding failed, probably corrupted!
277 //
279 }
280 }
281 else
282 {
283 if (getStatus.IsCorruption())
284 {
286 }
287 else if (getStatus.IsNotFound())
288 {
290 }
291 else
292 {
293 status = Status(customCode + unsafe_cast<int>(getStatus.code()));
294
295 JLOG(m_journal.error()) << getStatus.ToString();
296 }
297 }
298
299 return status;
300 }
301
303 fetchBatch(std::vector<uint256> const& hashes) override
304 {
306 results.reserve(hashes.size());
307 for (auto const& h : hashes)
308 {
310 Status const status = fetch(h, &nObj);
311 if (status != ok)
312 {
313 results.push_back({});
314 }
315 else
316 {
317 results.push_back(nObj);
318 }
319 }
320
321 return {results, ok};
322 }
323
324 void
325 store(std::shared_ptr<NodeObject> const& object) override
326 {
327 m_batch.store(object);
328 }
329
330 void
331 storeBatch(Batch const& batch) override
332 {
333 XRPL_ASSERT(
334 m_db,
335 "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
336 "database");
337 rocksdb::WriteBatch wb;
338
339 for (auto const& e : batch)
340 {
341 EncodedBlob const encoded(e);
342
343 wb.Put(
344 rocksdb::Slice(std::bit_cast<char const*>(encoded.getKey()), m_keyBytes),
345 rocksdb::Slice(std::bit_cast<char const*>(encoded.getData()), encoded.getSize()));
346 }
347
348 rocksdb::WriteOptions const options;
349
350 auto ret = m_db->Write(options, &wb);
351
352 if (!ret.ok())
353 Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
354 }
355
356 void
357 sync() override
358 {
359 }
360
361 void
363 {
364 XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::for_each : non-null database");
365 rocksdb::ReadOptions const options;
366
367 std::unique_ptr<rocksdb::Iterator> it(m_db->NewIterator(options));
368
369 for (it->SeekToFirst(); it->Valid(); it->Next())
370 {
371 if (it->key().size() == m_keyBytes)
372 {
373 DecodedBlob decoded(it->key().data(), it->value().data(), it->value().size());
374
375 if (decoded.wasOk())
376 {
377 f(decoded.createObject());
378 }
379 else
380 {
381 // Uh oh, corrupted data!
382 JLOG(m_journal.fatal()) << "Corrupt NodeObject #" << it->key().ToString(true);
383 }
384 }
385 else
386 {
387 // VFALCO NOTE What does it mean to find an
388 // incorrectly sized key? Corruption?
389 JLOG(m_journal.fatal()) << "Bad key size = " << it->key().size();
390 }
391 }
392 }
393
394 int
395 getWriteLoad() override
396 {
397 return m_batch.getWriteLoad();
398 }
399
400 void
401 setDeletePath() override
402 {
403 m_deletePath = true;
404 }
405
406 //--------------------------------------------------------------------------
407
408 void
409 writeBatch(Batch const& batch) override
410 {
411 storeBatch(batch);
412 }
413
415 int
416 fdRequired() const override
417 {
418 return fdRequired_;
419 }
420};
421
422//------------------------------------------------------------------------------
423
424class RocksDBFactory : public Factory
425{
426private:
427 Manager& manager_;
428
429public:
430 RocksDBEnv m_env;
431
432 RocksDBFactory(Manager& manager) : manager_(manager)
433 {
434 manager_.insert(*this);
435 }
436
438 getName() const override
439 {
440 return "RocksDB";
441 }
442
444 createInstance(
445 size_t keyBytes,
446 Section const& keyValues,
448 Scheduler& scheduler,
449 beast::Journal journal) override
450 {
451 return std::make_unique<RocksDBBackend>(keyBytes, keyValues, scheduler, journal, &m_env);
452 }
453};
454
455void
456registerRocksDBFactory(Manager& manager)
457{
458 static RocksDBFactory const instance{manager};
459}
460
461} // namespace NodeStore
462} // namespace xrpl
463
464#endif
A generic endpoint for log messages.
Definition Journal.h:40
Stream fatal() const
Definition Journal.h:325
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
T for_each(T... args)
T is_same_v
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Status
Return codes from Backend operations.
void registerRocksDBFactory(Manager &manager)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:75
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T to_string(T... args)