Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Schema.hpp
1#pragma once
2
3#include "data/cassandra/Concepts.hpp"
4#include "data/cassandra/Handle.hpp"
5#include "data/cassandra/Types.hpp"
6#include "util/log/Logger.hpp"
7
8#include <boost/json/string.hpp>
9#include <fmt/compile.h>
10
11#include <functional>
12#include <string>
13#include <string_view>
14#include <vector>
15
16namespace data::cassandra {
17
26template <SomeSettingsProvider SettingsProviderType>
27[[nodiscard]] std::string inline qualifiedTableName(
28 SettingsProviderType const& provider,
29 std::string_view name
30)
31{
32 return fmt::format(
33 "{}.{}{}", provider.getKeyspace(), provider.getTablePrefix().value_or(""), name
34 );
35}
36
40template <SomeSettingsProvider SettingsProviderType>
41class Schema {
42protected:
43 util::Logger log_{"Backend"};
44 std::reference_wrapper<SettingsProviderType const> settingsProvider_;
45
46public:
47 virtual ~Schema() = default;
48
54 explicit Schema(SettingsProviderType const& settingsProvider)
55 : settingsProvider_{std::cref(settingsProvider)}
56 {
57 }
58
59 std::string createKeyspace = [this]() {
60 return fmt::format(
61 R"(
62 CREATE KEYSPACE IF NOT EXISTS {}
63 WITH replication = {{
64 'class': 'SimpleStrategy',
65 'replication_factor': '{}'
66 }}
67 AND durable_writes = True
68 )",
69 settingsProvider_.get().getKeyspace(),
70 settingsProvider_.get().getReplicationFactor()
71 );
72 }();
73
74 // =======================
75 // Schema creation queries
76 // =======================
77
78 std::vector<Statement> createSchema = [this]() {
79 std::vector<Statement> statements;
80
81 statements.emplace_back(
82 fmt::format(
83 R"(
84 CREATE TABLE IF NOT EXISTS {}
85 (
86 key blob,
87 sequence bigint,
88 object blob,
89 PRIMARY KEY (key, sequence)
90 )
91 WITH CLUSTERING ORDER BY (sequence DESC)
92 )",
93 qualifiedTableName(settingsProvider_.get(), "objects")
94 )
95 );
96
97 statements.emplace_back(
98 fmt::format(
99 R"(
100 CREATE TABLE IF NOT EXISTS {}
101 (
102 hash blob PRIMARY KEY,
103 ledger_sequence bigint,
104 date bigint,
105 transaction blob,
106 metadata blob
107 )
108 )",
109 qualifiedTableName(settingsProvider_.get(), "transactions")
110 )
111 );
112
113 statements.emplace_back(
114 fmt::format(
115 R"(
116 CREATE TABLE IF NOT EXISTS {}
117 (
118 ledger_sequence bigint,
119 hash blob,
120 PRIMARY KEY (ledger_sequence, hash)
121 )
122 )",
123 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
124 )
125 );
126
127 statements.emplace_back(
128 fmt::format(
129 R"(
130 CREATE TABLE IF NOT EXISTS {}
131 (
132 key blob,
133 seq bigint,
134 next blob,
135 PRIMARY KEY (key, seq)
136 )
137 )",
138 qualifiedTableName(settingsProvider_.get(), "successor")
139 )
140 );
141
142 statements.emplace_back(
143 fmt::format(
144 R"(
145 CREATE TABLE IF NOT EXISTS {}
146 (
147 seq bigint,
148 key blob,
149 PRIMARY KEY (seq, key)
150 )
151 )",
152 qualifiedTableName(settingsProvider_.get(), "diff")
153 )
154 );
155
156 statements.emplace_back(
157 fmt::format(
158 R"(
159 CREATE TABLE IF NOT EXISTS {}
160 (
161 account blob,
162 seq_idx tuple<bigint, bigint>,
163 hash blob,
164 PRIMARY KEY (account, seq_idx)
165 )
166 WITH CLUSTERING ORDER BY (seq_idx DESC)
167 )",
168 qualifiedTableName(settingsProvider_.get(), "account_tx")
169 )
170 );
171
172 statements.emplace_back(
173 fmt::format(
174 R"(
175 CREATE TABLE IF NOT EXISTS {}
176 (
177 sequence bigint PRIMARY KEY,
178 header blob
179 )
180 )",
181 qualifiedTableName(settingsProvider_.get(), "ledgers")
182 )
183 );
184
185 statements.emplace_back(
186 fmt::format(
187 R"(
188 CREATE TABLE IF NOT EXISTS {}
189 (
190 hash blob PRIMARY KEY,
191 sequence bigint
192 )
193 )",
194 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
195 )
196 );
197
198 statements.emplace_back(
199 fmt::format(
200 R"(
201 CREATE TABLE IF NOT EXISTS {}
202 (
203 is_latest boolean PRIMARY KEY,
204 sequence bigint
205 )
206 )",
207 qualifiedTableName(settingsProvider_.get(), "ledger_range")
208 )
209 );
210
211 statements.emplace_back(
212 fmt::format(
213 R"(
214 CREATE TABLE IF NOT EXISTS {}
215 (
216 token_id blob,
217 sequence bigint,
218 owner blob,
219 is_burned boolean,
220 PRIMARY KEY (token_id, sequence)
221 )
222 WITH CLUSTERING ORDER BY (sequence DESC)
223 )",
224 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
225 )
226 );
227
228 statements.emplace_back(
229 fmt::format(
230 R"(
231 CREATE TABLE IF NOT EXISTS {}
232 (
233 issuer blob,
234 taxon bigint,
235 token_id blob,
236 PRIMARY KEY (issuer, taxon, token_id)
237 )
238 WITH CLUSTERING ORDER BY (taxon ASC, token_id ASC)
239 )",
240 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
241 )
242 );
243
244 statements.emplace_back(
245 fmt::format(
246 R"(
247 CREATE TABLE IF NOT EXISTS {}
248 (
249 token_id blob,
250 sequence bigint,
251 uri blob,
252 PRIMARY KEY (token_id, sequence)
253 )
254 WITH CLUSTERING ORDER BY (sequence DESC)
255 )",
256 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
257 )
258 );
259
260 statements.emplace_back(
261 fmt::format(
262 R"(
263 CREATE TABLE IF NOT EXISTS {}
264 (
265 token_id blob,
266 seq_idx tuple<bigint, bigint>,
267 hash blob,
268 PRIMARY KEY (token_id, seq_idx)
269 )
270 WITH CLUSTERING ORDER BY (seq_idx DESC)
271 )",
272 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
273 )
274 );
275
276 statements.emplace_back(
277 fmt::format(
278 R"(
279 CREATE TABLE IF NOT EXISTS {}
280 (
281 mpt_id blob,
282 holder blob,
283 PRIMARY KEY (mpt_id, holder)
284 )
285 WITH CLUSTERING ORDER BY (holder ASC)
286 )",
287 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
288 )
289 );
290
291 statements.emplace_back(
292 fmt::format(
293 R"(
294 CREATE TABLE IF NOT EXISTS {}
295 (
296 migrator_name TEXT,
297 status TEXT,
298 PRIMARY KEY (migrator_name)
299 )
300 )",
301 qualifiedTableName(settingsProvider_.get(), "migrator_status")
302 )
303 );
304
305 statements.emplace_back(
306 fmt::format(
307 R"(
308 CREATE TABLE IF NOT EXISTS {}
309 (
310 node_id UUID,
311 message TEXT,
312 PRIMARY KEY (node_id)
313 )
314 WITH default_time_to_live = 2
315 )",
316 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
317 )
318 );
319
320 return statements;
321 }();
322
327 protected:
328 std::reference_wrapper<SettingsProviderType const> settingsProvider_;
329 std::reference_wrapper<Handle const> handle_;
330
331 public:
338 Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
339 : settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
340 {
341 }
342
343 //
344 // Insert queries
345 //
346
347 PreparedStatement insertObject = [this]() {
348 return handle_.get().prepare(
349 fmt::format(
350 R"(
351 INSERT INTO {}
352 (key, sequence, object)
353 VALUES (?, ?, ?)
354 )",
355 qualifiedTableName(settingsProvider_.get(), "objects")
356 )
357 );
358 }();
359
360 PreparedStatement insertTransaction = [this]() {
361 return handle_.get().prepare(
362 fmt::format(
363 R"(
364 INSERT INTO {}
365 (hash, ledger_sequence, date, transaction, metadata)
366 VALUES (?, ?, ?, ?, ?)
367 )",
368 qualifiedTableName(settingsProvider_.get(), "transactions")
369 )
370 );
371 }();
372
373 PreparedStatement insertLedgerTransaction = [this]() {
374 return handle_.get().prepare(
375 fmt::format(
376 R"(
377 INSERT INTO {}
378 (ledger_sequence, hash)
379 VALUES (?, ?)
380 )",
381 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
382 )
383 );
384 }();
385
386 PreparedStatement insertSuccessor = [this]() {
387 return handle_.get().prepare(
388 fmt::format(
389 R"(
390 INSERT INTO {}
391 (key, seq, next)
392 VALUES (?, ?, ?)
393 )",
394 qualifiedTableName(settingsProvider_.get(), "successor")
395 )
396 );
397 }();
398
399 PreparedStatement insertDiff = [this]() {
400 return handle_.get().prepare(
401 fmt::format(
402 R"(
403 INSERT INTO {}
404 (seq, key)
405 VALUES (?, ?)
406 )",
407 qualifiedTableName(settingsProvider_.get(), "diff")
408 )
409 );
410 }();
411
412 PreparedStatement insertAccountTx = [this]() {
413 return handle_.get().prepare(
414 fmt::format(
415 R"(
416 INSERT INTO {}
417 (account, seq_idx, hash)
418 VALUES (?, ?, ?)
419 )",
420 qualifiedTableName(settingsProvider_.get(), "account_tx")
421 )
422 );
423 }();
424
425 PreparedStatement insertNFT = [this]() {
426 return handle_.get().prepare(
427 fmt::format(
428 R"(
429 INSERT INTO {}
430 (token_id, sequence, owner, is_burned)
431 VALUES (?, ?, ?, ?)
432 )",
433 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
434 )
435 );
436 }();
437
438 PreparedStatement insertIssuerNFT = [this]() {
439 return handle_.get().prepare(
440 fmt::format(
441 R"(
442 INSERT INTO {}
443 (issuer, taxon, token_id)
444 VALUES (?, ?, ?)
445 )",
446 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
447 )
448 );
449 }();
450
451 PreparedStatement insertNFTURI = [this]() {
452 return handle_.get().prepare(
453 fmt::format(
454 R"(
455 INSERT INTO {}
456 (token_id, sequence, uri)
457 VALUES (?, ?, ?)
458 )",
459 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
460 )
461 );
462 }();
463
464 PreparedStatement insertNFTTx = [this]() {
465 return handle_.get().prepare(
466 fmt::format(
467 R"(
468 INSERT INTO {}
469 (token_id, seq_idx, hash)
470 VALUES (?, ?, ?)
471 )",
472 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
473 )
474 );
475 }();
476
477 PreparedStatement insertMPTHolder = [this]() {
478 return handle_.get().prepare(
479 fmt::format(
480 R"(
481 INSERT INTO {}
482 (mpt_id, holder)
483 VALUES (?, ?)
484 )",
485 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
486 )
487 );
488 }();
489
490 PreparedStatement insertLedgerHeader = [this]() {
491 return handle_.get().prepare(
492 fmt::format(
493 R"(
494 INSERT INTO {}
495 (sequence, header)
496 VALUES (?, ?)
497 )",
498 qualifiedTableName(settingsProvider_.get(), "ledgers")
499 )
500 );
501 }();
502
503 PreparedStatement insertLedgerHash = [this]() {
504 return handle_.get().prepare(
505 fmt::format(
506 R"(
507 INSERT INTO {}
508 (hash, sequence)
509 VALUES (?, ?)
510 )",
511 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
512 )
513 );
514 }();
515
516 //
517 // Update (and "delete") queries
518 //
519
520 PreparedStatement deleteLedgerRange = [this]() {
521 return handle_.get().prepare(
522 fmt::format(
523 R"(
524 UPDATE {}
525 SET sequence = ?
526 WHERE is_latest = False
527 )",
528 qualifiedTableName(settingsProvider_.get(), "ledger_range")
529 )
530 );
531 }();
532
533 PreparedStatement insertMigratorStatus = [this]() {
534 return handle_.get().prepare(
535 fmt::format(
536 R"(
537 INSERT INTO {}
538 (migrator_name, status)
539 VALUES (?, ?)
540 )",
541 qualifiedTableName(settingsProvider_.get(), "migrator_status")
542 )
543 );
544 }();
545
546 PreparedStatement updateClioNodeMessage = [this]() {
547 return handle_.get().prepare(
548 fmt::format(
549 R"(
550 UPDATE {}
551 SET message = ?
552 WHERE node_id = ?
553 )",
554 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
555 )
556 );
557 }();
558
559 //
560 // Select queries
561 //
562
563 PreparedStatement selectSuccessor = [this]() {
564 return handle_.get().prepare(
565 fmt::format(
566 R"(
567 SELECT next
568 FROM {}
569 WHERE key = ?
570 AND seq <= ?
571 ORDER BY seq DESC
572 LIMIT 1
573 )",
574 qualifiedTableName(settingsProvider_.get(), "successor")
575 )
576 );
577 }();
578
579 PreparedStatement selectDiff = [this]() {
580 return handle_.get().prepare(
581 fmt::format(
582 R"(
583 SELECT key
584 FROM {}
585 WHERE seq = ?
586 )",
587 qualifiedTableName(settingsProvider_.get(), "diff")
588 )
589 );
590 }();
591
592 PreparedStatement selectObject = [this]() {
593 return handle_.get().prepare(
594 fmt::format(
595 R"(
596 SELECT object, sequence
597 FROM {}
598 WHERE key = ?
599 AND sequence <= ?
600 ORDER BY sequence DESC
601 LIMIT 1
602 )",
603 qualifiedTableName(settingsProvider_.get(), "objects")
604 )
605 );
606 }();
607
608 PreparedStatement selectTransaction = [this]() {
609 return handle_.get().prepare(
610 fmt::format(
611 R"(
612 SELECT transaction, metadata, ledger_sequence, date
613 FROM {}
614 WHERE hash = ?
615 )",
616 qualifiedTableName(settingsProvider_.get(), "transactions")
617 )
618 );
619 }();
620
621 PreparedStatement selectAllTransactionHashesInLedger = [this]() {
622 return handle_.get().prepare(
623 fmt::format(
624 R"(
625 SELECT hash
626 FROM {}
627 WHERE ledger_sequence = ?
628 )",
629 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
630 )
631 );
632 }();
633
634 PreparedStatement getToken = [this]() {
635 return handle_.get().prepare(
636 fmt::format(
637 R"(
638 SELECT TOKEN(key)
639 FROM {}
640 WHERE key = ?
641 LIMIT 1
642 )",
643 qualifiedTableName(settingsProvider_.get(), "objects")
644 )
645 );
646 }();
647
648 PreparedStatement selectAccountTx = [this]() {
649 return handle_.get().prepare(
650 fmt::format(
651 R"(
652 SELECT hash, seq_idx
653 FROM {}
654 WHERE account = ?
655 AND seq_idx < ?
656 LIMIT ?
657 )",
658 qualifiedTableName(settingsProvider_.get(), "account_tx")
659 )
660 );
661 }();
662
663 PreparedStatement selectAccountTxForward = [this]() {
664 return handle_.get().prepare(
665 fmt::format(
666 R"(
667 SELECT hash, seq_idx
668 FROM {}
669 WHERE account = ?
670 AND seq_idx > ?
671 ORDER BY seq_idx ASC
672 LIMIT ?
673 )",
674 qualifiedTableName(settingsProvider_.get(), "account_tx")
675 )
676 );
677 }();
678
679 PreparedStatement selectNFT = [this]() {
680 return handle_.get().prepare(
681 fmt::format(
682 R"(
683 SELECT sequence, owner, is_burned
684 FROM {}
685 WHERE token_id = ?
686 AND sequence <= ?
687 ORDER BY sequence DESC
688 LIMIT 1
689 )",
690 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
691 )
692 );
693 }();
694
695 PreparedStatement selectNFTURI = [this]() {
696 return handle_.get().prepare(
697 fmt::format(
698 R"(
699 SELECT uri
700 FROM {}
701 WHERE token_id = ?
702 AND sequence <= ?
703 ORDER BY sequence DESC
704 LIMIT 1
705 )",
706 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
707 )
708 );
709 }();
710
711 PreparedStatement selectNFTTx = [this]() {
712 return handle_.get().prepare(
713 fmt::format(
714 R"(
715 SELECT hash, seq_idx
716 FROM {}
717 WHERE token_id = ?
718 AND seq_idx < ?
719 ORDER BY seq_idx DESC
720 LIMIT ?
721 )",
722 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
723 )
724 );
725 }();
726
727 PreparedStatement selectNFTTxForward = [this]() {
728 return handle_.get().prepare(
729 fmt::format(
730 R"(
731 SELECT hash, seq_idx
732 FROM {}
733 WHERE token_id = ?
734 AND seq_idx >= ?
735 ORDER BY seq_idx ASC
736 LIMIT ?
737 )",
738 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
739 )
740 );
741 }();
742
743 PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
744 return handle_.get().prepare(
745 fmt::format(
746 R"(
747 SELECT token_id
748 FROM {}
749 WHERE issuer = ?
750 AND taxon = ?
751 AND token_id > ?
752 ORDER BY taxon ASC, token_id ASC
753 LIMIT ?
754 )",
755 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
756 )
757 );
758 }();
759
760 PreparedStatement selectMPTHolders = [this]() {
761 return handle_.get().prepare(
762 fmt::format(
763 R"(
764 SELECT holder
765 FROM {}
766 WHERE mpt_id = ?
767 AND holder > ?
768 ORDER BY holder ASC
769 LIMIT ?
770 )",
771 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
772 )
773 );
774 }();
775
776 PreparedStatement selectLedgerByHash = [this]() {
777 return handle_.get().prepare(
778 fmt::format(
779 R"(
780 SELECT sequence
781 FROM {}
782 WHERE hash = ?
783 LIMIT 1
784 )",
785 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
786 )
787 );
788 }();
789
790 PreparedStatement selectLedgerBySeq = [this]() {
791 return handle_.get().prepare(
792 fmt::format(
793 R"(
794 SELECT header
795 FROM {}
796 WHERE sequence = ?
797 )",
798 qualifiedTableName(settingsProvider_.get(), "ledgers")
799 )
800 );
801 }();
802
803 PreparedStatement selectLatestLedger = [this]() {
804 return handle_.get().prepare(
805 fmt::format(
806 R"(
807 SELECT sequence
808 FROM {}
809 WHERE is_latest = True
810 )",
811 qualifiedTableName(settingsProvider_.get(), "ledger_range")
812 )
813 );
814 }();
815
816 PreparedStatement selectLedgerRange = [this]() {
817 return handle_.get().prepare(
818 fmt::format(
819 R"(
820 SELECT sequence
821 FROM {}
822 WHERE is_latest in (True, False)
823 )",
824 qualifiedTableName(settingsProvider_.get(), "ledger_range")
825 )
826 );
827 }();
828
829 PreparedStatement selectMigratorStatus = [this]() {
830 return handle_.get().prepare(
831 fmt::format(
832 R"(
833 SELECT status
834 FROM {}
835 WHERE migrator_name = ?
836 )",
837 qualifiedTableName(settingsProvider_.get(), "migrator_status")
838 )
839 );
840 }();
841
842 PreparedStatement selectClioNodesData = [this]() {
843 return handle_.get().prepare(
844 fmt::format(
845 R"(
846 SELECT node_id, message
847 FROM {}
848 )",
849 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
850 )
851 );
852 }();
853 };
854
860 virtual void
861 prepareStatements(Handle const& handle) = 0;
862};
863
864} // namespace data::cassandra
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:27
Statements(SettingsProviderType const &settingsProvider, Handle const &handle)
Construct a new Statements object.
Definition Schema.hpp:338
Schema(SettingsProviderType const &settingsProvider)
Shared Schema's between all Schema classes (Cassandra and Keyspace).
Definition Schema.hpp:54
virtual void prepareStatements(Handle const &handle)=0
Recreates the prepared statements.
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
This namespace implements a wrapper for the Cassandra C++ driver.
Definition CassandraBackendFamily.hpp:47
std::string qualifiedTableName(SettingsProviderType const &provider, std::string_view name)
Returns the table name qualified with the keyspace and table prefix.
Definition Schema.hpp:27