Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CassandraMigrationBackend.hpp
1#pragma once
2
3#include "data/CassandraBackend.hpp"
4#include "data/LedgerCacheInterface.hpp"
5#include "data/cassandra/SettingsProvider.hpp"
6#include "migration/cassandra/impl/CassandraMigrationSchema.hpp"
7#include "migration/cassandra/impl/Spec.hpp"
8#include "util/log/Logger.hpp"
9
10#include <boost/asio/spawn.hpp>
11
12#include <cstdint>
13#include <string>
14#include <utility>
15
16namespace migration::cassandra {
17
22class CassandraMigrationBackend : public data::cassandra::CassandraBackend {
23 util::Logger log_{"Migration"};
24 data::cassandra::SettingsProvider settingsProvider_;
25 impl::CassandraMigrationSchema migrationSchema_;
26
27public:
35 data::cassandra::SettingsProvider settingsProvider,
37 )
38 : data::cassandra::CassandraBackend{auto{settingsProvider}, cache, false /* not readonly */}
39 , settingsProvider_(std::move(settingsProvider))
40 , migrationSchema_{settingsProvider_}
41 {
42 }
43
53 template <impl::TableSpec TableDesc>
54 void
56 std::int64_t const start,
57 std::int64_t const end,
58 auto const& callback,
59 boost::asio::yield_context yield
60 )
61 {
62 LOG(log_.debug()) << "Travsering token range: " << start << " - " << end
63 << " ; table: " << TableDesc::kTABLE_NAME;
64 // for each table we only have one prepared statement
65 static auto kSTATEMENT_PREPARED = migrationSchema_.getPreparedFullScanStatement(
66 handle_, TableDesc::kTABLE_NAME, TableDesc::kPARTITION_KEY
67 );
68
69 auto const statement = kSTATEMENT_PREPARED.bind(start, end);
70
71 auto const res = this->executor_.read(yield, statement);
72 if (not res) {
73 LOG(log_.error()) << "Could not fetch data from table: " << TableDesc::kTABLE_NAME
74 << " range: " << start << " - " << end << ";" << res.error();
75 return;
76 }
77
78 auto const& results = res.value();
79 if (not results.hasRows()) {
80 LOG(log_.debug()) << "No rows returned - table: " << TableDesc::kTABLE_NAME
81 << " range: " << start << " - " << end;
82 return;
83 }
84
85 for (auto const& row : std::apply(
86 [&](auto... args) { return data::cassandra::extract<decltype(args)...>(results); },
87 typename TableDesc::Row{}
88 )) {
89 callback(row);
90 }
91 }
92};
93} // namespace migration::cassandra
LedgerCacheInterface const & cache() const
Definition BackendInterface.hpp:151
Cache for an entire ledger.
Definition LedgerCacheInterface.hpp:21
Provides settings for BasicCassandraBackend.
Definition SettingsProvider.hpp:16
CassandraMigrationBackend(data::cassandra::SettingsProvider settingsProvider, data::LedgerCacheInterface &cache)
Construct a new Cassandra Migration Backend object. The backend is not readonly.
Definition CassandraMigrationBackend.hpp:34
void migrateInTokenRange(std::int64_t const start, std::int64_t const end, auto const &callback, boost::asio::yield_context yield)
Scan a table in a token range and call the callback for each row.
Definition CassandraMigrationBackend.hpp:55
The schema for the migration process. It contains the prepared statements only used for the migration...
Definition CassandraMigrationSchema.hpp:19
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
impl::ResultExtractor< Types... > extract(Handle::ResultType const &result)
Extracts the results into series of std::tuple<Types...> by creating a simple wrapper with an STL inp...
Definition Handle.hpp:314
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:56