Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
Schema.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "data/cassandra/Concepts.hpp"
23#include "data/cassandra/Handle.hpp"
24#include "data/cassandra/Types.hpp"
25#include "util/log/Logger.hpp"
26
27#include <fmt/compile.h>
28
29#include <functional>
30#include <memory>
31#include <string>
32#include <string_view>
33#include <vector>
34
35namespace data::cassandra {
36
45template <SomeSettingsProvider SettingsProviderType>
46[[nodiscard]] std::string inline qualifiedTableName(SettingsProviderType const& provider, std::string_view name)
47{
48 return fmt::format("{}.{}{}", provider.getKeyspace(), provider.getTablePrefix().value_or(""), name);
49}
50
54template <SomeSettingsProvider SettingsProviderType>
55class Schema {
56 util::Logger log_{"Backend"};
57 std::reference_wrapper<SettingsProviderType const> settingsProvider_;
58
59public:
65 explicit Schema(SettingsProviderType const& settingsProvider) : settingsProvider_{std::cref(settingsProvider)}
66 {
67 }
68
69 std::string createKeyspace = [this]() {
70 return fmt::format(
71 R"(
72 CREATE KEYSPACE IF NOT EXISTS {}
73 WITH replication = {{
74 'class': 'SimpleStrategy',
75 'replication_factor': '{}'
76 }}
77 AND durable_writes = True
78 )",
79 settingsProvider_.get().getKeyspace(),
80 settingsProvider_.get().getReplicationFactor()
81 );
82 }();
83
84 // =======================
85 // Schema creation queries
86 // =======================
87
88 std::vector<Statement> createSchema = [this]() {
89 std::vector<Statement> statements;
90
91 statements.emplace_back(
92 fmt::format(
93 R"(
94 CREATE TABLE IF NOT EXISTS {}
95 (
96 key blob,
97 sequence bigint,
98 object blob,
99 PRIMARY KEY (key, sequence)
100 )
101 WITH CLUSTERING ORDER BY (sequence DESC)
102 )",
103 qualifiedTableName(settingsProvider_.get(), "objects")
104 )
105 );
106
107 statements.emplace_back(
108 fmt::format(
109 R"(
110 CREATE TABLE IF NOT EXISTS {}
111 (
112 hash blob PRIMARY KEY,
113 ledger_sequence bigint,
114 date bigint,
115 transaction blob,
116 metadata blob
117 )
118 )",
119 qualifiedTableName(settingsProvider_.get(), "transactions")
120 )
121 );
122
123 statements.emplace_back(
124 fmt::format(
125 R"(
126 CREATE TABLE IF NOT EXISTS {}
127 (
128 ledger_sequence bigint,
129 hash blob,
130 PRIMARY KEY (ledger_sequence, hash)
131 )
132 )",
133 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
134 )
135 );
136
137 statements.emplace_back(
138 fmt::format(
139 R"(
140 CREATE TABLE IF NOT EXISTS {}
141 (
142 key blob,
143 seq bigint,
144 next blob,
145 PRIMARY KEY (key, seq)
146 )
147 )",
148 qualifiedTableName(settingsProvider_.get(), "successor")
149 )
150 );
151
152 statements.emplace_back(
153 fmt::format(
154 R"(
155 CREATE TABLE IF NOT EXISTS {}
156 (
157 seq bigint,
158 key blob,
159 PRIMARY KEY (seq, key)
160 )
161 )",
162 qualifiedTableName(settingsProvider_.get(), "diff")
163 )
164 );
165
166 statements.emplace_back(
167 fmt::format(
168 R"(
169 CREATE TABLE IF NOT EXISTS {}
170 (
171 account blob,
172 seq_idx tuple<bigint, bigint>,
173 hash blob,
174 PRIMARY KEY (account, seq_idx)
175 )
176 WITH CLUSTERING ORDER BY (seq_idx DESC)
177 )",
178 qualifiedTableName(settingsProvider_.get(), "account_tx")
179 )
180 );
181
182 statements.emplace_back(
183 fmt::format(
184 R"(
185 CREATE TABLE IF NOT EXISTS {}
186 (
187 sequence bigint PRIMARY KEY,
188 header blob
189 )
190 )",
191 qualifiedTableName(settingsProvider_.get(), "ledgers")
192 )
193 );
194
195 statements.emplace_back(
196 fmt::format(
197 R"(
198 CREATE TABLE IF NOT EXISTS {}
199 (
200 hash blob PRIMARY KEY,
201 sequence bigint
202 )
203 )",
204 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
205 )
206 );
207
208 statements.emplace_back(
209 fmt::format(
210 R"(
211 CREATE TABLE IF NOT EXISTS {}
212 (
213 is_latest boolean PRIMARY KEY,
214 sequence bigint
215 )
216 )",
217 qualifiedTableName(settingsProvider_.get(), "ledger_range")
218 )
219 );
220
221 statements.emplace_back(
222 fmt::format(
223 R"(
224 CREATE TABLE IF NOT EXISTS {}
225 (
226 token_id blob,
227 sequence bigint,
228 owner blob,
229 is_burned boolean,
230 PRIMARY KEY (token_id, sequence)
231 )
232 WITH CLUSTERING ORDER BY (sequence DESC)
233 )",
234 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
235 )
236 );
237
238 statements.emplace_back(
239 fmt::format(
240 R"(
241 CREATE TABLE IF NOT EXISTS {}
242 (
243 issuer blob,
244 taxon bigint,
245 token_id blob,
246 PRIMARY KEY (issuer, taxon, token_id)
247 )
248 WITH CLUSTERING ORDER BY (taxon ASC, token_id ASC)
249 )",
250 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
251 )
252 );
253
254 statements.emplace_back(
255 fmt::format(
256 R"(
257 CREATE TABLE IF NOT EXISTS {}
258 (
259 token_id blob,
260 sequence bigint,
261 uri blob,
262 PRIMARY KEY (token_id, sequence)
263 )
264 WITH CLUSTERING ORDER BY (sequence DESC)
265 )",
266 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
267 )
268 );
269
270 statements.emplace_back(
271 fmt::format(
272 R"(
273 CREATE TABLE IF NOT EXISTS {}
274 (
275 token_id blob,
276 seq_idx tuple<bigint, bigint>,
277 hash blob,
278 PRIMARY KEY (token_id, seq_idx)
279 )
280 WITH CLUSTERING ORDER BY (seq_idx DESC)
281 )",
282 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
283 )
284 );
285
286 statements.emplace_back(
287 fmt::format(
288 R"(
289 CREATE TABLE IF NOT EXISTS {}
290 (
291 mpt_id blob,
292 holder blob,
293 PRIMARY KEY (mpt_id, holder)
294 )
295 WITH CLUSTERING ORDER BY (holder ASC)
296 )",
297 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
298 )
299 );
300
301 statements.emplace_back(
302 fmt::format(
303 R"(
304 CREATE TABLE IF NOT EXISTS {}
305 (
306 migrator_name TEXT,
307 status TEXT,
308 PRIMARY KEY (migrator_name)
309 )
310 )",
311 qualifiedTableName(settingsProvider_.get(), "migrator_status")
312 )
313 );
314
315 statements.emplace_back(
316 fmt::format(
317 R"(
318 CREATE TABLE IF NOT EXISTS {}
319 (
320 node_id UUID,
321 message TEXT,
322 PRIMARY KEY (node_id)
323 )
324 WITH default_time_to_live = 2
325 )",
326 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
327 )
328 );
329
330 return statements;
331 }();
332
337 std::reference_wrapper<SettingsProviderType const> settingsProvider_;
338 std::reference_wrapper<Handle const> handle_;
339
340 public:
347 Statements(SettingsProviderType const& settingsProvider, Handle const& handle)
348 : settingsProvider_{settingsProvider}, handle_{std::cref(handle)}
349 {
350 }
351
352 //
353 // Insert queries
354 //
355
356 PreparedStatement insertObject = [this]() {
357 return handle_.get().prepare(
358 fmt::format(
359 R"(
360 INSERT INTO {}
361 (key, sequence, object)
362 VALUES (?, ?, ?)
363 )",
364 qualifiedTableName(settingsProvider_.get(), "objects")
365 )
366 );
367 }();
368
369 PreparedStatement insertTransaction = [this]() {
370 return handle_.get().prepare(
371 fmt::format(
372 R"(
373 INSERT INTO {}
374 (hash, ledger_sequence, date, transaction, metadata)
375 VALUES (?, ?, ?, ?, ?)
376 )",
377 qualifiedTableName(settingsProvider_.get(), "transactions")
378 )
379 );
380 }();
381
382 PreparedStatement insertLedgerTransaction = [this]() {
383 return handle_.get().prepare(
384 fmt::format(
385 R"(
386 INSERT INTO {}
387 (ledger_sequence, hash)
388 VALUES (?, ?)
389 )",
390 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
391 )
392 );
393 }();
394
395 PreparedStatement insertSuccessor = [this]() {
396 return handle_.get().prepare(
397 fmt::format(
398 R"(
399 INSERT INTO {}
400 (key, seq, next)
401 VALUES (?, ?, ?)
402 )",
403 qualifiedTableName(settingsProvider_.get(), "successor")
404 )
405 );
406 }();
407
408 PreparedStatement insertDiff = [this]() {
409 return handle_.get().prepare(
410 fmt::format(
411 R"(
412 INSERT INTO {}
413 (seq, key)
414 VALUES (?, ?)
415 )",
416 qualifiedTableName(settingsProvider_.get(), "diff")
417 )
418 );
419 }();
420
421 PreparedStatement insertAccountTx = [this]() {
422 return handle_.get().prepare(
423 fmt::format(
424 R"(
425 INSERT INTO {}
426 (account, seq_idx, hash)
427 VALUES (?, ?, ?)
428 )",
429 qualifiedTableName(settingsProvider_.get(), "account_tx")
430 )
431 );
432 }();
433
434 PreparedStatement insertNFT = [this]() {
435 return handle_.get().prepare(
436 fmt::format(
437 R"(
438 INSERT INTO {}
439 (token_id, sequence, owner, is_burned)
440 VALUES (?, ?, ?, ?)
441 )",
442 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
443 )
444 );
445 }();
446
447 PreparedStatement insertIssuerNFT = [this]() {
448 return handle_.get().prepare(
449 fmt::format(
450 R"(
451 INSERT INTO {}
452 (issuer, taxon, token_id)
453 VALUES (?, ?, ?)
454 )",
455 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
456 )
457 );
458 }();
459
460 PreparedStatement insertNFTURI = [this]() {
461 return handle_.get().prepare(
462 fmt::format(
463 R"(
464 INSERT INTO {}
465 (token_id, sequence, uri)
466 VALUES (?, ?, ?)
467 )",
468 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
469 )
470 );
471 }();
472
473 PreparedStatement insertNFTTx = [this]() {
474 return handle_.get().prepare(
475 fmt::format(
476 R"(
477 INSERT INTO {}
478 (token_id, seq_idx, hash)
479 VALUES (?, ?, ?)
480 )",
481 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
482 )
483 );
484 }();
485
486 PreparedStatement insertMPTHolder = [this]() {
487 return handle_.get().prepare(
488 fmt::format(
489 R"(
490 INSERT INTO {}
491 (mpt_id, holder)
492 VALUES (?, ?)
493 )",
494 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
495 )
496 );
497 }();
498
499 PreparedStatement insertLedgerHeader = [this]() {
500 return handle_.get().prepare(
501 fmt::format(
502 R"(
503 INSERT INTO {}
504 (sequence, header)
505 VALUES (?, ?)
506 )",
507 qualifiedTableName(settingsProvider_.get(), "ledgers")
508 )
509 );
510 }();
511
512 PreparedStatement insertLedgerHash = [this]() {
513 return handle_.get().prepare(
514 fmt::format(
515 R"(
516 INSERT INTO {}
517 (hash, sequence)
518 VALUES (?, ?)
519 )",
520 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
521 )
522 );
523 }();
524
525 //
526 // Update (and "delete") queries
527 //
528
529 PreparedStatement updateLedgerRange = [this]() {
530 return handle_.get().prepare(
531 fmt::format(
532 R"(
533 UPDATE {}
534 SET sequence = ?
535 WHERE is_latest = ?
536 IF sequence IN (?, null)
537 )",
538 qualifiedTableName(settingsProvider_.get(), "ledger_range")
539 )
540 );
541 }();
542
543 PreparedStatement deleteLedgerRange = [this]() {
544 return handle_.get().prepare(
545 fmt::format(
546 R"(
547 UPDATE {}
548 SET sequence = ?
549 WHERE is_latest = False
550 )",
551 qualifiedTableName(settingsProvider_.get(), "ledger_range")
552 )
553 );
554 }();
555
556 PreparedStatement insertMigratorStatus = [this]() {
557 return handle_.get().prepare(
558 fmt::format(
559 R"(
560 INSERT INTO {}
561 (migrator_name, status)
562 VALUES (?, ?)
563 )",
564 qualifiedTableName(settingsProvider_.get(), "migrator_status")
565 )
566 );
567 }();
568
569 PreparedStatement updateClioNodeMessage = [this]() {
570 return handle_.get().prepare(
571 fmt::format(
572 R"(
573 UPDATE {}
574 SET message = ?
575 WHERE node_id = ?
576 )",
577 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
578 )
579 );
580 }();
581
582 //
583 // Select queries
584 //
585
586 PreparedStatement selectSuccessor = [this]() {
587 return handle_.get().prepare(
588 fmt::format(
589 R"(
590 SELECT next
591 FROM {}
592 WHERE key = ?
593 AND seq <= ?
594 ORDER BY seq DESC
595 LIMIT 1
596 )",
597 qualifiedTableName(settingsProvider_.get(), "successor")
598 )
599 );
600 }();
601
602 PreparedStatement selectDiff = [this]() {
603 return handle_.get().prepare(
604 fmt::format(
605 R"(
606 SELECT key
607 FROM {}
608 WHERE seq = ?
609 )",
610 qualifiedTableName(settingsProvider_.get(), "diff")
611 )
612 );
613 }();
614
615 PreparedStatement selectObject = [this]() {
616 return handle_.get().prepare(
617 fmt::format(
618 R"(
619 SELECT object, sequence
620 FROM {}
621 WHERE key = ?
622 AND sequence <= ?
623 ORDER BY sequence DESC
624 LIMIT 1
625 )",
626 qualifiedTableName(settingsProvider_.get(), "objects")
627 )
628 );
629 }();
630
631 PreparedStatement selectTransaction = [this]() {
632 return handle_.get().prepare(
633 fmt::format(
634 R"(
635 SELECT transaction, metadata, ledger_sequence, date
636 FROM {}
637 WHERE hash = ?
638 )",
639 qualifiedTableName(settingsProvider_.get(), "transactions")
640 )
641 );
642 }();
643
644 PreparedStatement selectAllTransactionHashesInLedger = [this]() {
645 return handle_.get().prepare(
646 fmt::format(
647 R"(
648 SELECT hash
649 FROM {}
650 WHERE ledger_sequence = ?
651 )",
652 qualifiedTableName(settingsProvider_.get(), "ledger_transactions")
653 )
654 );
655 }();
656
657 PreparedStatement selectLedgerPageKeys = [this]() {
658 return handle_.get().prepare(
659 fmt::format(
660 R"(
661 SELECT key
662 FROM {}
663 WHERE TOKEN(key) >= ?
664 AND sequence <= ?
665 PER PARTITION LIMIT 1
666 LIMIT ?
667 ALLOW FILTERING
668 )",
669 qualifiedTableName(settingsProvider_.get(), "objects")
670 )
671 );
672 }();
673
674 PreparedStatement selectLedgerPage = [this]() {
675 return handle_.get().prepare(
676 fmt::format(
677 R"(
678 SELECT object, key
679 FROM {}
680 WHERE TOKEN(key) >= ?
681 AND sequence <= ?
682 PER PARTITION LIMIT 1
683 LIMIT ?
684 ALLOW FILTERING
685 )",
686 qualifiedTableName(settingsProvider_.get(), "objects")
687 )
688 );
689 }();
690
691 PreparedStatement getToken = [this]() {
692 return handle_.get().prepare(
693 fmt::format(
694 R"(
695 SELECT TOKEN(key)
696 FROM {}
697 WHERE key = ?
698 LIMIT 1
699 )",
700 qualifiedTableName(settingsProvider_.get(), "objects")
701 )
702 );
703 }();
704
705 PreparedStatement selectAccountTx = [this]() {
706 return handle_.get().prepare(
707 fmt::format(
708 R"(
709 SELECT hash, seq_idx
710 FROM {}
711 WHERE account = ?
712 AND seq_idx < ?
713 LIMIT ?
714 )",
715 qualifiedTableName(settingsProvider_.get(), "account_tx")
716 )
717 );
718 }();
719
720 PreparedStatement selectAccountFromBeginning = [this]() {
721 return handle_.get().prepare(
722 fmt::format(
723 R"(
724 SELECT account
725 FROM {}
726 WHERE token(account) > 0
727 PER PARTITION LIMIT 1
728 LIMIT ?
729 )",
730 qualifiedTableName(settingsProvider_.get(), "account_tx")
731 )
732 );
733 }();
734
735 PreparedStatement selectAccountFromToken = [this]() {
736 return handle_.get().prepare(
737 fmt::format(
738 R"(
739 SELECT account
740 FROM {}
741 WHERE token(account) > token(?)
742 PER PARTITION LIMIT 1
743 LIMIT ?
744 )",
745 qualifiedTableName(settingsProvider_.get(), "account_tx")
746 )
747 );
748 }();
749
750 PreparedStatement selectAccountTxForward = [this]() {
751 return handle_.get().prepare(
752 fmt::format(
753 R"(
754 SELECT hash, seq_idx
755 FROM {}
756 WHERE account = ?
757 AND seq_idx > ?
758 ORDER BY seq_idx ASC
759 LIMIT ?
760 )",
761 qualifiedTableName(settingsProvider_.get(), "account_tx")
762 )
763 );
764 }();
765
766 PreparedStatement selectNFT = [this]() {
767 return handle_.get().prepare(
768 fmt::format(
769 R"(
770 SELECT sequence, owner, is_burned
771 FROM {}
772 WHERE token_id = ?
773 AND sequence <= ?
774 ORDER BY sequence DESC
775 LIMIT 1
776 )",
777 qualifiedTableName(settingsProvider_.get(), "nf_tokens")
778 )
779 );
780 }();
781
782 PreparedStatement selectNFTURI = [this]() {
783 return handle_.get().prepare(
784 fmt::format(
785 R"(
786 SELECT uri
787 FROM {}
788 WHERE token_id = ?
789 AND sequence <= ?
790 ORDER BY sequence DESC
791 LIMIT 1
792 )",
793 qualifiedTableName(settingsProvider_.get(), "nf_token_uris")
794 )
795 );
796 }();
797
798 PreparedStatement selectNFTTx = [this]() {
799 return handle_.get().prepare(
800 fmt::format(
801 R"(
802 SELECT hash, seq_idx
803 FROM {}
804 WHERE token_id = ?
805 AND seq_idx < ?
806 ORDER BY seq_idx DESC
807 LIMIT ?
808 )",
809 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
810 )
811 );
812 }();
813
814 PreparedStatement selectNFTTxForward = [this]() {
815 return handle_.get().prepare(
816 fmt::format(
817 R"(
818 SELECT hash, seq_idx
819 FROM {}
820 WHERE token_id = ?
821 AND seq_idx >= ?
822 ORDER BY seq_idx ASC
823 LIMIT ?
824 )",
825 qualifiedTableName(settingsProvider_.get(), "nf_token_transactions")
826 )
827 );
828 }();
829
830 PreparedStatement selectNFTIDsByIssuer = [this]() {
831 return handle_.get().prepare(
832 fmt::format(
833 R"(
834 SELECT token_id
835 FROM {}
836 WHERE issuer = ?
837 AND (taxon, token_id) > ?
838 ORDER BY taxon ASC, token_id ASC
839 LIMIT ?
840 )",
841 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
842 )
843 );
844 }();
845
846 PreparedStatement selectNFTIDsByIssuerTaxon = [this]() {
847 return handle_.get().prepare(
848 fmt::format(
849 R"(
850 SELECT token_id
851 FROM {}
852 WHERE issuer = ?
853 AND taxon = ?
854 AND token_id > ?
855 ORDER BY taxon ASC, token_id ASC
856 LIMIT ?
857 )",
858 qualifiedTableName(settingsProvider_.get(), "issuer_nf_tokens_v2")
859 )
860 );
861 }();
862
863 PreparedStatement selectMPTHolders = [this]() {
864 return handle_.get().prepare(
865 fmt::format(
866 R"(
867 SELECT holder
868 FROM {}
869 WHERE mpt_id = ?
870 AND holder > ?
871 ORDER BY holder ASC
872 LIMIT ?
873 )",
874 qualifiedTableName(settingsProvider_.get(), "mp_token_holders")
875 )
876 );
877 }();
878
879 PreparedStatement selectLedgerByHash = [this]() {
880 return handle_.get().prepare(
881 fmt::format(
882 R"(
883 SELECT sequence
884 FROM {}
885 WHERE hash = ?
886 LIMIT 1
887 )",
888 qualifiedTableName(settingsProvider_.get(), "ledger_hashes")
889 )
890 );
891 }();
892
893 PreparedStatement selectLedgerBySeq = [this]() {
894 return handle_.get().prepare(
895 fmt::format(
896 R"(
897 SELECT header
898 FROM {}
899 WHERE sequence = ?
900 )",
901 qualifiedTableName(settingsProvider_.get(), "ledgers")
902 )
903 );
904 }();
905
906 PreparedStatement selectLatestLedger = [this]() {
907 return handle_.get().prepare(
908 fmt::format(
909 R"(
910 SELECT sequence
911 FROM {}
912 WHERE is_latest = True
913 )",
914 qualifiedTableName(settingsProvider_.get(), "ledger_range")
915 )
916 );
917 }();
918
919 PreparedStatement selectLedgerRange = [this]() {
920 return handle_.get().prepare(
921 fmt::format(
922 R"(
923 SELECT sequence
924 FROM {}
925 WHERE is_latest in (True, False)
926 )",
927 qualifiedTableName(settingsProvider_.get(), "ledger_range")
928 )
929 );
930 }();
931
932 PreparedStatement selectMigratorStatus = [this]() {
933 return handle_.get().prepare(
934 fmt::format(
935 R"(
936 SELECT status
937 FROM {}
938 WHERE migrator_name = ?
939 )",
940 qualifiedTableName(settingsProvider_.get(), "migrator_status")
941 )
942 );
943 }();
944
945 PreparedStatement selectClioNodesData = [this]() {
946 return handle_.get().prepare(
947 fmt::format(
948 R"(
949 SELECT node_id, message
950 FROM {}
951 )",
952 qualifiedTableName(settingsProvider_.get(), "nodes_chat")
953 )
954 );
955 }();
956 };
957
963 void
965 {
966 LOG(log_.info()) << "Preparing cassandra statements";
967 statements_ = std::make_unique<Statements>(settingsProvider_, handle);
968 LOG(log_.info()) << "Finished preparing statements";
969 }
970
976 std::unique_ptr<Statements> const&
978 {
979 return statements_;
980 }
981
982private:
983 std::unique_ptr<Statements> statements_{nullptr};
984};
985
986} // namespace data::cassandra
Represents a handle to the cassandra database cluster.
Definition Handle.hpp:46
Prepared statements holder.
Definition Schema.hpp:336
Statements(SettingsProviderType const &settingsProvider, Handle const &handle)
Construct a new Statements object.
Definition Schema.hpp:347
Manages the DB schema and provides access to prepared statements.
Definition Schema.hpp:55
Schema(SettingsProviderType const &settingsProvider)
Construct a new Schema object.
Definition Schema.hpp:65
void prepareStatements(Handle const &handle)
Recreates the prepared statements.
Definition Schema.hpp:964
std::unique_ptr< Statements > const & operator->() const
Provides access to statements.
Definition Schema.hpp:977
Represents a prepared statement on the DB side.
Definition Statement.hpp:163
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump info(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::NFO severity.
Definition Logger.cpp:312
This namespace implements a wrapper for the Cassandra C++ driver.
Definition Concepts.hpp:37
std::string qualifiedTableName(SettingsProviderType const &provider, std::string_view name)
Returns the table name qualified with the keyspace and table prefix.
Definition Schema.hpp:46