1#include <xrpl/nodestore/Database.h>
3#include <xrpl/basics/Log.h>
4#include <xrpl/basics/base_uint.h>
5#include <xrpl/basics/contract.h>
6#include <xrpl/beast/core/CurrentThreadName.h>
7#include <xrpl/beast/utility/Journal.h>
8#include <xrpl/beast/utility/instrumentation.h>
9#include <xrpl/config/BasicConfig.h>
10#include <xrpl/config/Constants.h>
11#include <xrpl/json/json_forwards.h>
12#include <xrpl/json/json_value.h>
13#include <xrpl/nodestore/Backend.h>
14#include <xrpl/nodestore/NodeObject.h>
15#include <xrpl/nodestore/Scheduler.h>
16#include <xrpl/nodestore/Types.h>
17#include <xrpl/protocol/SystemParameters.h>
18#include <xrpl/protocol/jss.h>
46 XRPL_ASSERT(readThreads,
"xrpl::NodeStore::Database::Database : nonzero threads input");
88 for (
auto it =
read.begin(); it !=
read.end(); ++it)
92 "xrpl::NodeStore::Database::Database : non-empty "
95 auto const& hash = it->first;
96 auto const& data = it->second;
97 auto const seqn = data[0].first;
106 for (
auto const& req : data)
109 (seqn == req.first) ||
isSameDB(req.first, seqn)
149 if (!
readStopping_.exchange(
true, std::memory_order_relaxed))
151 JLOG(
j_.debug()) <<
"Clearing read queue because of stop request";
157 JLOG(
j_.debug()) <<
"Waiting for stop request to complete...";
167 "xrpl::NodeStore::Database::stop : maximum stop duration");
172 <<
"Stop request completed in "
187 read_[
hash].emplace_back(ledgerSeq, std::move(cb));
197 auto storeBatch = [&, fname = __func__]() {
204 JLOG(
j_.error()) <<
"Exception caught in function " << fname <<
". Error: " << e.
what();
209 for (
auto const& nodeObject : batch)
210 sz += nodeObject->getData().size();
216 XRPL_ASSERT(nodeObject,
"xrpl::NodeStore::Database::importInternal : non-null node");
248 fetchSz_ += nodeObject->getData().size();
260 XRPL_ASSERT(obj.
isObject(),
"xrpl::NodeStore::Database::getCountsJson : valid input type");
A generic endpoint for log messages.
A backend used for the NodeStore.
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
void storeStats(std::uint64_t count, std::uint64_t sz)
std::condition_variable readCondVar_
std::atomic< std::uint64_t > fetchTotalCount_
std::atomic< int > readThreads_
std::atomic< int > runningThreads_
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
std::atomic< bool > readStopping_
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
std::atomic< std::uint64_t > fetchDurationUs_
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
virtual ~Database()
Destroy the node store.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::Synchronous, bool duplicate=false)
Fetch a node object.
std::atomic< std::uint64_t > storeSz_
std::uint32_t const earliestLedgerSeq_
std::atomic< std::uint32_t > fetchHitCount_
std::atomic< std::uint64_t > storeCount_
void getCountsJson(json::Value &obj)
virtual void forEach(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
std::atomic< std::uint32_t > fetchSz_
void importInternal(Backend &dstBackend, Database &srcDB)
Scheduling for asynchronous backend activity.
Holds a collection of configuration values.
T duration_cast(T... args)
T emplace_back(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void read(nudb::detail::istream &is, std::size_t &u)
static constexpr auto kBatchWritePreallocationSize
std::vector< std::shared_ptr< NodeObject > > Batch
A batch of NodeObjects to write at once.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
static constexpr std::uint32_t kXrpLedgerEarliestSeq
The XRP ledger network's earliest allowed sequence.
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Contains information about a fetch operation.
std::chrono::milliseconds elapsed