rippled
Loading...
Searching...
No Matches
SHAMapStoreImp.cpp
1#include <xrpld/app/ledger/TransactionMaster.h>
2#include <xrpld/app/misc/SHAMapStoreImp.h>
3#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
4#include <xrpld/core/ConfigSections.h>
5
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/nodestore/Scheduler.h>
8#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
9#include <xrpl/server/NetworkOPs.h>
10#include <xrpl/server/State.h>
11#include <xrpl/shamap/SHAMapMissingNode.h>
12
13#include <boost/algorithm/string/predicate.hpp>
14
15namespace xrpl {
16void
18{
19 std::lock_guard const lock(mutex_);
20 initStateDB(sqlDb_, config, dbName);
21}
22
30
33{
34 std::lock_guard const lock(mutex_);
35
36 return xrpl::setCanDelete(sqlDb_, canDelete);
37}
38
46
47void
49{
50 std::lock_guard const lock(mutex_);
51 xrpl::setSavedState(sqlDb_, state);
52}
53
54void
60
61//------------------------------------------------------------------------------
62
64 Application& app,
65 NodeStore::Scheduler& scheduler,
66 beast::Journal journal)
67 : app_(app)
68 , scheduler_(scheduler)
69 , journal_(journal)
70 , working_(true)
71 , canDelete_(std::numeric_limits<LedgerIndex>::max())
72{
73 Config& config{app.config()};
74
75 Section& section{config.section(ConfigSection::nodeDatabase())};
76 if (section.empty())
77 {
78 Throw<std::runtime_error>(
79 "Missing [" + ConfigSection::nodeDatabase() + "] entry in configuration file");
80 }
81
82 // RocksDB only. Use sensible defaults if no values specified.
83 if (boost::iequals(get(section, "type"), "RocksDB"))
84 {
85 if (!section.exists("cache_mb"))
86 {
87 section.set("cache_mb", std::to_string(config.getValueFor(SizedItem::hashNodeDBCache)));
88 }
89
90 if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2))
91 section.set("filter_bits", "10");
92 }
93
94 get_if_exists(section, "online_delete", deleteInterval_);
95
96 if (deleteInterval_ != 0u)
97 {
98 // Configuration that affects the behavior of online delete
99 get_if_exists(section, "delete_batch", deleteBatch_);
100 std::uint32_t temp = 0;
101 if (get_if_exists(section, "back_off_milliseconds", temp) ||
102 // Included for backward compatibility with an undocumented setting
103 get_if_exists(section, "backOff", temp))
104 {
106 }
107 if (get_if_exists(section, "age_threshold_seconds", temp))
109 if (get_if_exists(section, "recovery_wait_seconds", temp))
111
112 get_if_exists(section, "advisory_delete", advisoryDelete_);
113
114 auto const minInterval =
116 if (deleteInterval_ < minInterval)
117 {
118 Throw<std::runtime_error>(
119 "online_delete must be at least " + std::to_string(minInterval));
120 }
121
122 if (config.LEDGER_HISTORY > deleteInterval_)
123 {
124 Throw<std::runtime_error>(
125 "online_delete must not be less than ledger_history "
126 "(currently " +
127 std::to_string(config.LEDGER_HISTORY) + ")");
128 }
129
130 state_db_.init(config, dbName_);
131 dbPaths();
132 }
133}
134
137{
140
141 if (deleteInterval_ != 0u)
142 {
143 SavedState state = state_db_.getState();
144 auto writableBackend = makeBackendRotating(state.writableDb);
145 auto archiveBackend = makeBackendRotating(state.archiveDb);
146 if (state.writableDb.empty())
147 {
148 state.writableDb = writableBackend->getName();
149 state.archiveDb = archiveBackend->getName();
150 state_db_.setState(state);
151 }
152
153 // Create NodeStore with two backends to allow online deletion of
154 // data
157 readThreads,
158 std::move(writableBackend),
159 std::move(archiveBackend),
160 nscfg,
162 fdRequired_ += dbr->fdRequired();
163 dbRotating_ = dbr.get();
164 db.reset(dynamic_cast<NodeStore::Database*>(dbr.release()));
165 }
166 else
167 {
171 readThreads,
172 nscfg,
174 fdRequired_ += db->fdRequired();
175 }
176 return db;
177}
178
179void
181{
182 {
183 std::lock_guard const lock(mutex_);
184 newLedger_ = ledger;
185 working_ = true;
186 }
188}
189
190void
192{
193 if (!working_)
194 return;
195
197 rendezvous_.wait(lock, [&] { return !working_; });
198}
199
200int
202{
203 return fdRequired_;
204}
205
206bool
208{
209 // Copy a single record from node to dbRotating_
212 if ((++nodeCount % checkHealthInterval_) == 0u)
213 {
214 if (healthWait() == stopping)
215 return false;
216 }
217
218 return true;
219}
220
221void
223{
224 beast::setCurrentThreadName("SHAMapStore");
226 netOPs_ = &app_.getOPs();
228
229 if (advisoryDelete_)
231
232 while (true)
233 {
234 healthy_ = true;
235 std::shared_ptr<Ledger const> validatedLedger;
236
237 {
239 working_ = false;
241 if (stop_)
242 {
243 return;
244 }
245 cond_.wait(lock);
246 if (newLedger_)
247 {
248 validatedLedger = std::move(newLedger_);
249 }
250 else
251 {
252 continue;
253 }
254 }
255
256 LedgerIndex const validatedSeq = validatedLedger->header().seq;
257 if (lastRotated == 0u)
258 {
259 lastRotated = validatedSeq;
260 state_db_.setLastRotated(lastRotated);
261 }
262
263 bool const readyToRotate = validatedSeq >= lastRotated + deleteInterval_ &&
264 canDelete_ >= lastRotated - 1 && healthWait() == keepGoing;
265
266 // will delete up to (not including) lastRotated
267 if (readyToRotate)
268 {
269 JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq << " lastRotated "
270 << lastRotated << " deleteInterval " << deleteInterval_
271 << " canDelete_ " << canDelete_ << " state "
272 << app_.getOPs().strOperatingMode(false) << " age "
274
275 clearPrior(lastRotated);
276 if (healthWait() == stopping)
277 return;
278
279 JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
280 std::uint64_t nodeCount = 0;
281
282 try
283 {
284 validatedLedger->stateMap().snapShot(false)->visitNodes(
285 std::bind(
287 this,
288 std::ref(nodeCount),
289 std::placeholders::_1));
290 }
291 catch (SHAMapMissingNode const& e)
292 {
293 JLOG(journal_.error())
294 << "Missing node while copying ledger before rotate: " << e.what();
295 continue;
296 }
297
298 if (healthWait() == stopping)
299 return;
300 // Only log if we completed without a "health" abort
301 JLOG(journal_.debug())
302 << "copied ledger " << validatedSeq << " nodecount " << nodeCount;
303
304 JLOG(journal_.debug()) << "freshening caches";
306 if (healthWait() == stopping)
307 return;
308 // Only log if we completed without a "health" abort
309 JLOG(journal_.debug()) << validatedSeq << " freshened caches";
310
311 JLOG(journal_.trace()) << "Making a new backend";
312 auto newBackend = makeBackendRotating();
313 JLOG(journal_.debug()) << validatedSeq << " new backend " << newBackend->getName();
314
315 clearCaches(validatedSeq);
316 if (healthWait() == stopping)
317 return;
318
319 lastRotated = validatedSeq;
320
322 std::move(newBackend),
323 [&](std::string const& writableName, std::string const& archiveName) {
324 SavedState savedState;
325 savedState.writableDb = writableName;
326 savedState.archiveDb = archiveName;
327 savedState.lastRotated = lastRotated;
328 state_db_.setState(savedState);
329
330 clearCaches(validatedSeq);
331 });
332
333 JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
334 }
335 }
336}
337
338void
340{
342 boost::filesystem::path dbPath = get(section, "path");
343
344 if (boost::filesystem::exists(dbPath))
345 {
346 if (!boost::filesystem::is_directory(dbPath))
347 {
348 journal_.error() << "node db path must be a directory. " << dbPath.string();
349 Throw<std::runtime_error>("node db path must be a directory.");
350 }
351 }
352 else
353 {
354 boost::filesystem::create_directories(dbPath);
355 }
356
357 SavedState state = state_db_.getState();
358
359 {
360 auto update = [&dbPath](std::string& sPath) {
361 if (sPath.empty())
362 return false;
363
364 // Check if configured "path" matches stored directory path
365 using namespace boost::filesystem;
366 auto const stored{path(sPath)};
367 if (stored.parent_path() == dbPath)
368 return false;
369
370 sPath = (dbPath / stored.filename()).string();
371 return true;
372 };
373
374 if (update(state.writableDb))
375 {
376 update(state.archiveDb);
377 state_db_.setState(state);
378 }
379 }
380
381 bool writableDbExists = false;
382 bool archiveDbExists = false;
383
385 for (boost::filesystem::directory_iterator it(dbPath);
386 it != boost::filesystem::directory_iterator();
387 ++it)
388 {
389 if (state.writableDb.compare(it->path().string()) == 0)
390 {
391 writableDbExists = true;
392 }
393 else if (state.archiveDb.compare(it->path().string()) == 0)
394 {
395 archiveDbExists = true;
396 }
397 else if (dbPrefix_.compare(it->path().stem().string()) == 0)
398 {
399 pathsToDelete.push_back(it->path());
400 }
401 }
402
403 if ((!writableDbExists && !state.writableDb.empty()) ||
404 (!archiveDbExists && !state.archiveDb.empty()) || (writableDbExists != archiveDbExists) ||
405 state.writableDb.empty() != state.archiveDb.empty())
406 {
407 boost::filesystem::path stateDbPathName = app_.config().legacy("database_path");
408 stateDbPathName /= dbName_;
409 stateDbPathName += "*";
410
411 journal_.error() << "state db error:\n"
412 << " writableDbExists " << writableDbExists << " archiveDbExists "
413 << archiveDbExists << '\n'
414 << " writableDb '" << state.writableDb << "' archiveDb '"
415 << state.archiveDb << "\n\n"
416 << "The existing data is in a corrupted state.\n"
417 << "To resume operation, remove the files matching "
418 << stateDbPathName.string() << " and contents of the directory "
419 << get(section, "path") << '\n'
420 << "Optionally, you can move those files to another\n"
421 << "location if you wish to analyze or back up the data.\n"
422 << "However, there is no guarantee that the data in its\n"
423 << "existing form is usable.";
424
425 Throw<std::runtime_error>("state db error");
426 }
427
428 // The necessary directories exist. Now, remove any others.
429 for (boost::filesystem::path const& p : pathsToDelete)
430 boost::filesystem::remove_all(p);
431}
432
435{
437 boost::filesystem::path newPath;
438
439 if (!path.empty())
440 {
441 newPath = path;
442 }
443 else
444 {
445 boost::filesystem::path p = get(section, "path");
446 p /= dbPrefix_;
447 p += ".%%%%";
448 newPath = boost::filesystem::unique_path(p);
449 }
450 section.set("path", newPath.string());
451
453 section,
457 backend->open();
458 return backend;
459}
460
461void
463 LedgerIndex lastRotated,
464 std::string const& TableName,
465 std::function<std::optional<LedgerIndex>()> const& getMinSeq,
466 std::function<void(LedgerIndex)> const& deleteBeforeSeq)
467{
468 XRPL_ASSERT(deleteInterval_, "xrpl::SHAMapStoreImp::clearSql : nonzero delete interval");
470
471 {
472 JLOG(journal_.trace()) << "Begin: Look up lowest value of: " << TableName;
473 auto m = getMinSeq();
474 JLOG(journal_.trace()) << "End: Look up lowest value of: " << TableName;
475 if (!m)
476 return;
477 min = *m;
478 }
479
480 if (min > lastRotated || healthWait() == stopping)
481 return;
482 if (min == lastRotated)
483 {
484 // Micro-optimization mainly to clarify logs
485 JLOG(journal_.trace()) << "Nothing to delete from " << TableName;
486 return;
487 }
488
489 JLOG(journal_.debug()) << "start deleting in: " << TableName << " from " << min << " to "
490 << lastRotated;
491 while (min < lastRotated)
492 {
493 min = std::min(lastRotated, min + deleteBatch_);
494 JLOG(journal_.trace()) << "Begin: Delete up to " << deleteBatch_
495 << " rows with LedgerSeq < " << min << " from: " << TableName;
496 deleteBeforeSeq(min);
497 JLOG(journal_.trace()) << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < "
498 << min << " from: " << TableName;
499 if (healthWait() == stopping)
500 return;
501 if (min < lastRotated)
503 if (healthWait() == stopping)
504 return;
505 }
506 JLOG(journal_.debug()) << "finished deleting from: " << TableName;
507}
508
509void
511{
513 // Also clear the FullBelowCache so its generation counter is bumped.
514 // This prevents stale "full below" markers from persisting across
515 // backend rotation/online deletion and interfering with SHAMap sync.
517}
518
519void
527
528void
530{
531 // Do not allow ledgers to be acquired from the network
532 // that are about to be deleted.
533 minimumOnline_ = lastRotated + 1;
534 JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to " << lastRotated;
535 ledgerMaster_->clearPriorLedgers(lastRotated);
536 JLOG(journal_.trace()) << "End: Clear internal ledgers up to " << lastRotated;
537 if (healthWait() == stopping)
538 return;
539
540 auto& db = app_.getRelationalDatabase();
541
542 clearSql(
543 lastRotated,
544 "Ledgers",
545 [&db]() -> std::optional<LedgerIndex> { return db.getMinLedgerSeq(); },
546 [&db](LedgerIndex min) -> void { db.deleteBeforeLedgerSeq(min); });
547 if (healthWait() == stopping)
548 return;
549
550 if (!app_.config().useTxTables())
551 return;
552
553 clearSql(
554 lastRotated,
555 "Transactions",
556 [&db]() -> std::optional<LedgerIndex> { return db.getTransactionsMinLedgerSeq(); },
557 [&db](LedgerIndex min) -> void { db.deleteTransactionsBeforeLedgerSeq(min); });
558 if (healthWait() == stopping)
559 return;
560
561 clearSql(
562 lastRotated,
563 "AccountTransactions",
564 [&db]() -> std::optional<LedgerIndex> { return db.getAccountTransactionsMinLedgerSeq(); },
565 [&db](LedgerIndex min) -> void { db.deleteAccountTransactionsBeforeLedgerSeq(min); });
566 if (healthWait() == stopping)
567 return;
568}
569
572{
576 while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_))
577 {
578 lock.unlock();
579 JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count()
580 << "s for node to stabilize. state: "
581 << app_.getOPs().strOperatingMode(mode, false) << ". age "
582 << age.count() << 's';
585 mode = netOPs_->getOperatingMode();
586 lock.lock();
587 }
588
589 return stop_ ? stopping : keepGoing;
590}
591
592void
594{
595 if (thread_.joinable())
596 {
597 {
598 std::lock_guard const lock(mutex_);
599 stop_ = true;
601 }
602 thread_.join();
603 }
604}
605
608{
609 // minimumOnline_ with 0 value is equivalent to unknown/not set.
610 // Don't attempt to acquire ledgers if that value is unknown.
611 if ((deleteInterval_ != 0u) && (minimumOnline_ != 0u))
612 return minimumOnline_.load();
613 return app_.getLedgerMaster().minSqlSeq();
614}
615
616//------------------------------------------------------------------------------
617
620{
621 return std::make_unique<SHAMapStoreImp>(app, scheduler, journal);
622}
623
624} // namespace xrpl
T bind(T... args)
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
bool useTxTables() const
Definition Config.h:322
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
Definition Config.cpp:1150
virtual std::shared_ptr< TreeNodeCache > getTreeNodeCache()=0
Return a pointer to the Family Tree Node Cache.
virtual std::shared_ptr< FullBelowCache > getFullBelowCache()=0
Return a pointer to the Family Full Below Cache.
std::optional< LedgerIndex > minSqlSeq()
std::chrono::seconds getValidatedLedgerAge()
void clearPriorLedgers(LedgerIndex seq)
void clearLedgerCachePrior(LedgerIndex seq)
virtual OperatingMode getOperatingMode() const =0
virtual std::string strOperatingMode(OperatingMode const mode, bool const admin=false) const =0
virtual void rotate(std::unique_ptr< NodeStore::Backend > &&newBackend, std::function< void(std::string const &writableName, std::string const &archiveName)> const &f)=0
Rotates the backends.
Persistency layer for NodeObject.
Definition Database.h:31
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:209
static Manager & instance()
Returns the instance of the manager singleton.
virtual std::unique_ptr< Database > make_Database(std::size_t burstSize, Scheduler &scheduler, int readThreads, Section const &backendParameters, beast::Journal journal)=0
Construct a NodeStore database.
virtual std::unique_ptr< Backend > make_Backend(Section const &parameters, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)=0
Create a backend.
Scheduling for asynchronous backend activity.
uint256 const & as_uint256() const
Definition SHAMapHash.h:24
void setState(SavedState const &state)
void init(BasicConfig const &config, std::string const &dbName)
LedgerIndex setCanDelete(LedgerIndex canDelete)
void clearSql(LedgerIndex lastRotated, std::string const &TableName, std::function< std::optional< LedgerIndex >()> const &getMinSeq, std::function< void(LedgerIndex)> const &deleteBeforeSeq)
delete from sqlite table in batches to not lock the db excessively.
bool copyNode(std::uint64_t &nodeCount, SHAMapTreeNode const &node)
std::unique_ptr< NodeStore::Backend > makeBackendRotating(std::string path=std::string())
std::atomic< bool > working_
std::condition_variable cond_
std::uint32_t deleteBatch_
std::atomic< LedgerIndex > minimumOnline_
std::chrono::seconds recoveryWaitTime_
If the node is out of sync during an online_delete healthWait() call, sleep the thread for this time,...
std::optional< LedgerIndex > minimumOnline() const override
The minimum ledger to try and maintain in our database.
static constexpr auto nodeStoreName_
std::uint32_t deleteInterval_
int fdRequired() const override
Returns the number of file descriptors that are needed.
static std::uint32_t const minimumDeletionInterval_
std::unique_ptr< NodeStore::Database > makeNodeStore(int readThreads) override
std::chrono::milliseconds backOff_
std::atomic< LedgerIndex > canDelete_
NodeStore::DatabaseRotating * dbRotating_
std::shared_ptr< Ledger const > newLedger_
std::string const dbName_
std::condition_variable rendezvous_
static std::uint32_t const minimumDeletionIntervalSA_
std::uint64_t const checkHealthInterval_
HealthResult healthWait()
void clearCaches(LedgerIndex validatedSeq)
std::chrono::seconds ageThreshold_
void rendezvous() const override
LedgerMaster * ledgerMaster_
beast::Journal const journal_
std::string const dbPrefix_
NodeStore::Scheduler & scheduler_
HealthResult
This is a health check for online deletion that waits until rippled is stable before returning.
void onLedgerClosed(std::shared_ptr< Ledger const > const &ledger) override
Called by LedgerMaster every time a ledger validates.
bool freshenCache(CacheInstance &cache)
void clearPrior(LedgerIndex lastRotated)
SHAMapStoreImp(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
SHAMapHash const & getHash() const
Return the hash of this node.
Holds a collection of configuration values.
Definition BasicConfig.h:24
virtual RelationalDatabase & getRelationalDatabase()=0
virtual beast::Journal getJournal(std::string const &name)=0
virtual NetworkOPs & getOPs()=0
virtual TransactionMaster & getMasterTransaction()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual Family & getNodeFamily()=0
TaggedCache< uint256, Transaction > & getCache()
T compare(T... args)
T count(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
T max(T... args)
T min(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
void initStateDB(soci::session &session, BasicConfig const &config, std::string const &dbName)
initStateDB Opens a session with the State database.
Definition State.cpp:6
void setSavedState(soci::session &session, SavedState const &state)
setSavedState Saves the given state.
Definition State.cpp:83
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
void setLastRotated(soci::session &session, LedgerIndex seq)
setLastRotated Updates the last rotated ledger sequence.
Definition State.cpp:94
std::unique_ptr< SHAMapStore > make_SHAMapStore(Application &app, NodeStore::Scheduler &scheduler, beast::Journal journal)
SavedState getSavedState(soci::session &session)
getSavedState Returns the saved state.
Definition State.cpp:72
constexpr auto megabytes(T value) noexcept
bool get_if_exists(Section const &section, std::string const &name, T &v)
LedgerIndex setCanDelete(soci::session &session, LedgerIndex canDelete)
setCanDelete Updates the ledger sequence which can be deleted.
Definition State.cpp:64
OperatingMode
Specifies the mode under which the server believes it's operating.
Definition NetworkOPs.h:50
@ FULL
we have the ledger and can even validate
LedgerIndex getCanDelete(soci::session &session)
getCanDelete Returns the ledger sequence which can be deleted.
Definition State.cpp:55
T push_back(T... args)
T ref(T... args)
T reset(T... args)
T sleep_for(T... args)
static std::string nodeDatabase()
std::string writableDb
Definition State.h:13
LedgerIndex lastRotated
Definition State.h:15
std::string archiveDb
Definition State.h:14
T to_string(T... args)
T what(T... args)