rippled
Loading...
Searching...
No Matches
DatabaseNodeImp.cpp
1#include <xrpl/nodestore/detail/DatabaseNodeImp.h>
2
3namespace ripple {
4namespace NodeStore {
5
6void
9 Blob&& data,
10 uint256 const& hash,
12{
13 storeStats(1, data.size());
14
15 auto obj = NodeObject::createObject(type, std::move(data), hash);
16 backend_->store(obj);
17 if (cache_)
18 {
19 // After the store, replace a negative cache entry if there is one
20 cache_->canonicalize(
21 hash, obj, [](std::shared_ptr<NodeObject> const& n) {
22 return n->getType() == hotDUMMY;
23 });
24 }
25}
26
27void
29 uint256 const& hash,
30 std::uint32_t ledgerSeq,
31 std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
32{
33 if (cache_)
34 {
35 std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
36 if (obj)
37 {
38 callback(obj->getType() == hotDUMMY ? nullptr : obj);
39 return;
40 }
41 }
42 Database::asyncFetch(hash, ledgerSeq, std::move(callback));
43}
44
45void
47{
48 if (cache_)
49 cache_->sweep();
50}
51
54 uint256 const& hash,
56 FetchReport& fetchReport,
57 bool duplicate)
58{
60 cache_ ? cache_->fetch(hash) : nullptr;
61
62 if (!nodeObject)
63 {
64 JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not "
65 << (cache_ ? "cached" : "found");
66
67 Status status;
68
69 try
70 {
71 status = backend_->fetch(hash.data(), &nodeObject);
72 }
73 catch (std::exception const& e)
74 {
75 JLOG(j_.fatal())
76 << "fetchNodeObject " << hash
77 << ": Exception fetching from backend: " << e.what();
78 Rethrow();
79 }
80
81 switch (status)
82 {
83 case ok:
84 if (cache_)
85 {
86 if (nodeObject)
87 cache_->canonicalize_replace_client(hash, nodeObject);
88 else
89 {
90 auto notFound =
92 cache_->canonicalize_replace_client(hash, notFound);
93 if (notFound->getType() != hotDUMMY)
94 nodeObject = notFound;
95 }
96 }
97 break;
98 case notFound:
99 break;
100 case dataCorrupt:
101 JLOG(j_.fatal()) << "fetchNodeObject " << hash
102 << ": nodestore data is corrupted";
103 break;
104 default:
105 JLOG(j_.warn())
106 << "fetchNodeObject " << hash
107 << ": backend returns unknown result " << status;
108 break;
109 }
110 }
111 else
112 {
113 JLOG(j_.trace()) << "fetchNodeObject " << hash
114 << ": record found in cache";
115 if (nodeObject->getType() == hotDUMMY)
116 nodeObject.reset();
117 }
118
119 if (nodeObject)
120 fetchReport.wasFound = true;
121
122 return nodeObject;
123}
124
127{
129 using namespace std::chrono;
130 auto const before = steady_clock::now();
132 std::vector<uint256 const*> cacheMisses;
133 uint64_t hits = 0;
134 uint64_t fetches = 0;
135 for (size_t i = 0; i < hashes.size(); ++i)
136 {
137 auto const& hash = hashes[i];
138 // See if the object already exists in the cache
139 auto nObj = cache_ ? cache_->fetch(hash) : nullptr;
140 ++fetches;
141 if (!nObj)
142 {
143 // Try the database
144 indexMap[&hash] = i;
145 cacheMisses.push_back(&hash);
146 }
147 else
148 {
149 results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
150 // It was in the cache.
151 ++hits;
152 }
153 }
154
155 JLOG(j_.debug()) << "fetchBatch - cache hits = "
156 << (hashes.size() - cacheMisses.size())
157 << " - cache misses = " << cacheMisses.size();
158 auto dbResults = backend_->fetchBatch(cacheMisses).first;
159
160 for (size_t i = 0; i < dbResults.size(); ++i)
161 {
162 auto nObj = std::move(dbResults[i]);
163 size_t index = indexMap[cacheMisses[i]];
164 auto const& hash = hashes[index];
165
166 if (nObj)
167 {
168 // Ensure all threads get the same object
169 if (cache_)
170 cache_->canonicalize_replace_client(hash, nObj);
171 }
172 else
173 {
174 JLOG(j_.error())
175 << "fetchBatch - "
176 << "record not found in db or cache. hash = " << strHex(hash);
177 if (cache_)
178 {
180 cache_->canonicalize_replace_client(hash, notFound);
181 if (notFound->getType() != hotDUMMY)
182 nObj = std::move(notFound);
183 }
184 }
185 results[index] = std::move(nObj);
186 }
187
188 auto fetchDurationUs =
189 std::chrono::duration_cast<std::chrono::microseconds>(
190 steady_clock::now() - before)
191 .count();
192 updateFetchMetrics(fetches, hits, fetchDurationUs);
193 return results;
194}
195
196} // namespace NodeStore
197} // namespace ripple
Stream fatal() const
Definition Journal.h:333
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
std::shared_ptr< Backend > backend_
std::shared_ptr< TaggedCache< uint256, NodeObject > > cache_
std::vector< std::shared_ptr< NodeObject > > fetchBatch(std::vector< uint256 > const &hashes)
void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t) override
Store the object.
void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback) override
Fetch an object without waiting.
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t, FetchReport &fetchReport, bool duplicate) override
void sweep() override
Remove expired entries from the positive and negative caches.
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:229
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:165
beast::Journal const j_
Definition Database.h:208
void updateFetchMetrics(uint64_t fetches, uint64_t hits, uint64_t duration)
Definition Database.h:243
Status
Return codes from Backend operations.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
NodeObjectType
The types of node objects.
Definition NodeObject.h:13
@ hotDUMMY
Definition NodeObject.h:18
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:29
T push_back(T... args)
T reset(T... args)
T size(T... args)
Contains information about a fetch operation.
T what(T... args)