xrpld
Loading...
Searching...
No Matches
SociDB.cpp
1#include <xrpl/basics/Log.h>
2#include <xrpl/config/BasicConfig.h>
3#include <xrpl/config/Constants.h>
4#include <xrpl/core/Job.h>
5#include <xrpl/core/JobQueue.h>
6#include <xrpl/core/ServiceRegistry.h>
7
8#include <boost/filesystem/operations.hpp>
9#include <boost/filesystem/path.hpp>
10
11#include <soci/blob.h>
12
13#include <cstddef>
14#include <cstdint>
15#include <mutex>
16#include <stdexcept>
17#include <string>
18#include <utility>
19#include <vector>
20#if defined(__clang__)
21#pragma clang diagnostic push
22#pragma clang diagnostic ignored "-Wdeprecated"
23#endif
24
25#include <xrpl/basics/ByteUtilities.h>
26#include <xrpl/basics/contract.h>
27#include <xrpl/rdb/DatabaseCon.h>
28#include <xrpl/rdb/SociDB.h>
29
30#include <soci/sqlite3/soci-sqlite3.h> // IWYU pragma: keep
31
32#include <memory>
33
34namespace xrpl {
35
36static auto gCheckpointPageCount = 1000;
37
38namespace detail {
39
41getSociSqliteInit(std::string const& name, std::string const& dir, std::string const& ext)
42{
43 if (name.empty())
44 {
46 "Sqlite databases must specify a dir and a name. Name: " + name + " Dir: " + dir);
47 }
48 boost::filesystem::path file(dir);
49 if (is_directory(file))
50 file /= name + ext;
51 return file.string();
52}
53
55getSociInit(BasicConfig const& config, std::string const& dbName)
56{
57 auto const& section = config.section(Sections::kSqdb);
58 auto const backendName = get(section, Keys::kBackend, "sqlite");
59
60 if (backendName != "sqlite")
61 Throw<std::runtime_error>("Unsupported soci backend: " + backendName);
62
63 auto const path = config.legacy(Sections::kDatabasePath);
64 auto const ext = dbName == "validators" || dbName == "peerfinder" ? ".sqlite" : ".db";
65 return detail::getSociSqliteInit(dbName, path, ext);
66}
67
68} // namespace detail
69
71{
72}
73
74DBConfig::DBConfig(BasicConfig const& config, std::string const& dbName)
75 : DBConfig(detail::getSociInit(config, dbName))
76{
77}
78
84
85void
86DBConfig::open(soci::session& s) const
87{
88 s.open(soci::sqlite3, connectionString());
89}
90
91void
92open(soci::session& s, BasicConfig const& config, std::string const& dbName)
93{
94 DBConfig(config, dbName).open(s);
95}
96
97void
98open(soci::session& s, std::string const& beName, std::string const& connectionString)
99{
100 if (beName == "sqlite")
101 {
102 s.open(soci::sqlite3, connectionString);
103 }
104 else
105 {
106 Throw<std::runtime_error>("Unsupported soci backend: " + beName);
107 }
108}
109
110static sqlite_api::sqlite3*
111getConnection(soci::session& s)
112{
113 sqlite_api::sqlite3* result = nullptr; // NOLINT(misc-const-correctness)
114 auto be = s.get_backend();
115 if (auto b = dynamic_cast<soci::sqlite3_session_backend*>(be))
116 result = b->conn_;
117
118 if (result == nullptr)
119 Throw<std::logic_error>("Didn't get a database connection.");
120
121 return result;
122}
123
125getKBUsedAll(soci::session& s)
126{
127 if (getConnection(s) == nullptr)
128 Throw<std::logic_error>("No connection found.");
129 return static_cast<size_t>(sqlite_api::sqlite3_memory_used() / kilobytes(1));
130}
131
133getKBUsedDB(soci::session& s)
134{
135 // This function will have to be customized when other backends are added
136 if (auto conn = getConnection(s))
137 {
138 int cur = 0, hiw = 0;
139 sqlite_api::sqlite3_db_status(conn, SQLITE_DBSTATUS_CACHE_USED, &cur, &hiw, 0);
140 return cur / kilobytes(1);
141 }
143 return 0; // Silence compiler warning.
144}
145
146void
147convert(soci::blob& from, std::vector<std::uint8_t>& to)
148{
149 to.resize(from.get_len());
150 if (to.empty())
151 return;
152 from.read(0, reinterpret_cast<char*>(&to[0]), from.get_len());
153}
154
155void
156convert(soci::blob& from, std::string& to)
157{
159 convert(from, tmp);
160 to.assign(tmp.begin(), tmp.end());
161}
162
163void
164convert(std::vector<std::uint8_t> const& from, soci::blob& to)
165{
166 if (!from.empty())
167 {
168 to.write(0, reinterpret_cast<char const*>(&from[0]), from.size());
169 }
170 else
171 {
172 to.trim(0);
173 }
174}
175
176void
177convert(std::string const& from, soci::blob& to)
178{
179 if (!from.empty())
180 {
181 to.write(0, from.data(), from.size());
182 }
183 else
184 {
185 to.trim(0);
186 }
187}
188
189namespace {
190
199
200class WALCheckpointer : public Checkpointer
201{
202public:
203 WALCheckpointer(
206 JobQueue& q,
207 ServiceRegistry& registry)
208 : id_(id)
209 , session_(std::move(session))
210 , jobQueue_(q)
211 , j_(registry.getJournal("WALCheckpointer"))
212 {
213 if (auto [conn, keepAlive] = getConnection(); conn)
214 {
215 (void)keepAlive;
216 sqlite_api::sqlite3_wal_hook(conn, &sqliteWALHook, reinterpret_cast<void*>(id_));
217 }
218 }
219
220 std::pair<sqlite_api::sqlite3*, std::shared_ptr<soci::session>>
221 getConnection() const
222 {
223 if (auto p = session_.lock())
224 {
225 return {xrpl::getConnection(*p), p};
226 }
227 return {nullptr, std::shared_ptr<soci::session>{}};
228 }
229
230 std::uintptr_t
231 id() const override
232 {
233 return id_;
234 }
235
236 ~WALCheckpointer() override = default;
237
238 void
239 schedule() override
240 {
241 {
242 std::scoped_lock const lock(mutex_);
243 if (running_)
244 return;
245 running_ = true;
246 }
247
248 // If the Job is not added to the JobQueue then we're not running_.
249 if (!jobQueue_.addJob(
250 JtWal,
251 "WAL",
252 // If the owning DatabaseCon is destroyed, no need to checkpoint
253 // or keep the checkpointer alive so use a weak_ptr to this.
254 // There is a separate check in `checkpoint` for a valid
255 // connection in the rare case when the DatabaseCon is destroyed
256 // after locking this weak_ptr
257 [wp = std::weak_ptr<Checkpointer>{shared_from_this()}]() {
258 if (auto self = wp.lock())
259 self->checkpoint();
260 }))
261 {
262 std::scoped_lock const lock(mutex_);
263 running_ = false;
264 }
265 }
266
267 void
268 checkpoint() override
269 {
270 auto [conn, keepAlive] = getConnection();
271 (void)keepAlive;
272 if (conn == nullptr)
273 return;
274
275 int log = 0, ckpt = 0;
276 int const ret = sqlite_api::sqlite3_wal_checkpoint_v2(
277 conn, nullptr, SQLITE_CHECKPOINT_PASSIVE, &log, &ckpt);
278
279 auto fname = sqlite_api::sqlite3_db_filename(conn, "main");
280 if (ret != SQLITE_OK)
281 {
282 auto jm = (ret == SQLITE_LOCKED) ? j_.trace() : j_.warn();
283 JLOG(jm) << "WAL(" << fname << "): error " << ret;
284 }
285 else
286 {
287 JLOG(j_.trace()) << "WAL(" << fname << "): frames=" << log << ", written=" << ckpt;
288 }
289
290 std::scoped_lock const lock(mutex_);
291 running_ = false;
292 }
293
294protected:
295 std::uintptr_t const id_;
296 // session is owned by the DatabaseCon parent that holds the checkpointer.
297 // It is possible (though rare) for the DatabaseCon class to be destroyed
298 // before the checkpointer.
299 std::weak_ptr<soci::session> session_;
300 std::mutex mutex_;
301 JobQueue& jobQueue_;
302
303 bool running_ = false;
304 beast::Journal const j_;
305
306 static int
307 sqliteWALHook(void* cpId, sqlite_api::sqlite3* conn, char const* dbName, int walSize)
308 {
309 if (walSize >= gCheckpointPageCount)
310 {
311 if (auto checkpointer = checkpointerFromId(reinterpret_cast<std::uintptr_t>(cpId)))
312 {
313 checkpointer->schedule();
314 }
315 else
316 {
317 sqlite_api::sqlite3_wal_hook(conn, nullptr, nullptr);
318 }
319 }
320 return SQLITE_OK;
321 }
322};
323
324} // namespace
325
326std::shared_ptr<Checkpointer>
330 JobQueue& queue,
331 ServiceRegistry& registry)
332{
333 return std::make_shared<WALCheckpointer>(id, std::move(session), queue, registry);
334}
335
336} // namespace xrpl
337
338#if defined(__clang__)
339#pragma clang diagnostic pop
340#endif
T assign(T... args)
T begin(T... args)
Holds unparsed configuration information.
void legacy(std::string const &section, std::string value)
Set a value that is not a key/value pair.
Section & section(std::string const &name)
Returns the section with the given name.
DBConfig is used when a client wants to delay opening a soci::session after parsing the config parame...
Definition SociDB.h:40
void open(soci::session &s) const
Definition SociDB.cpp:86
std::string connectionString() const
Definition SociDB.cpp:80
DBConfig(std::string dbPath)
Definition SociDB.cpp:70
std::string connectionString_
Definition SociDB.h:41
A pool of threads to perform work.
Definition JobQueue.h:43
Service registry for dependency injection.
T data(T... args)
T empty(T... args)
T end(T... args)
T lock(T... args)
T log(T... args)
T make_shared(T... args)
STL namespace.
std::string getSociSqliteInit(std::string const &name, std::string const &dir, std::string const &ext)
Definition SociDB.cpp:41
std::string getSociInit(BasicConfig const &config, std::string const &dbName)
Definition SociDB.cpp:55
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
std::uint32_t getKBUsedDB(soci::session &s)
Definition SociDB.cpp:133
static auto gCheckpointPageCount
Definition SociDB.cpp:36
static sqlite_api::sqlite3 * getConnection(soci::session &s)
Definition SociDB.cpp:111
std::uint32_t getKBUsedAll(soci::session &s)
Definition SociDB.cpp:125
void open(soci::session &s, BasicConfig const &config, std::string const &dbName)
Open a soci session.
Definition SociDB.cpp:92
@ JtWal
Definition Job.h:51
constexpr auto kilobytes(T value) noexcept
std::shared_ptr< Checkpointer > makeCheckpointer(std::uintptr_t id, std::weak_ptr< soci::session >, JobQueue &, ServiceRegistry &)
Returns a new checkpointer which makes checkpoints of a soci database every checkpointPageCount pages...
Definition SociDB.cpp:327
void convert(soci::blob &from, std::vector< std::uint8_t > &to)
Definition SociDB.cpp:147
std::shared_ptr< Checkpointer > checkpointerFromId(std::uintptr_t id)
XRPL_NO_SANITIZE_ADDRESS void Throw(Args &&... args)
Definition contract.h:49
T resize(T... args)
T size(T... args)
static constexpr auto kBackend
Definition Constants.h:92
static constexpr auto kSqdb
Definition Constants.h:59
static constexpr auto kDatabasePath
Definition Constants.h:13