Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
CursorFromDiffProvider.hpp
1#pragma once
2
3#include "data/BackendInterface.hpp"
4#include "data/Types.hpp"
5#include "etl/impl/BaseCursorProvider.hpp"
6#include "util/Assert.hpp"
7
8#include <xrpl/basics/base_uint.h>
9
10#include <algorithm>
11#include <cstddef>
12#include <cstdint>
13#include <iterator>
14#include <memory>
15#include <ranges>
16#include <set>
17#include <vector>
18
19namespace etl::impl {
20
21class CursorFromDiffProvider : public BaseCursorProvider {
22 std::shared_ptr<BackendInterface> backend_;
23 size_t numCursors_;
24
25public:
26 CursorFromDiffProvider(std::shared_ptr<BackendInterface> backend, size_t numCursors)
27 : backend_{std::move(backend)}, numCursors_{numCursors}
28 {
29 }
30
31 [[nodiscard]] std::vector<CursorPair>
32 getCursors(uint32_t const seq) const override
33 {
34 namespace rg = std::ranges;
35 namespace vs = std::views;
36
37 auto const fetchDiff = [this, seq](uint32_t offset) {
38 return data::synchronousAndRetryOnTimeout([this, seq, offset](auto yield) {
39 return backend_->fetchLedgerDiff(seq - offset, yield);
40 });
41 };
42
43 auto const range = backend_->fetchLedgerRange();
44 ASSERT(range.has_value(), "Ledger range is not available when cache is loading");
45
46 std::set<ripple::uint256> liveCursors;
47 std::set<ripple::uint256> deletedCursors;
48 auto i = 0;
49 while (liveCursors.size() < numCursors_ and seq - i >= range->minSequence) {
50 auto diffs = fetchDiff(i++);
51 rg::copy(
52 diffs //
53 | vs::filter([&deletedCursors](auto const& obj) {
54 return not obj.blob.empty() and !deletedCursors.contains(obj.key);
55 }) //
56 | vs::transform([](auto const& obj) { return obj.key; }),
57 std::inserter(liveCursors, std::begin(liveCursors))
58 );
59
60 // track the deleted objects
61 rg::copy(
62 diffs //
63 | vs::filter([](auto const& obj) { return obj.blob.empty(); }) //
64 | vs::transform([](auto const& obj) { return obj.key; }),
65 std::inserter(deletedCursors, std::begin(deletedCursors))
66 );
67 }
68
69 std::vector<ripple::uint256> cursors{data::kFIRST_KEY};
70 rg::copy(
71 liveCursors | vs::take(std::min(liveCursors.size(), numCursors_)),
72 std::back_inserter(cursors)
73 );
74 rg::sort(cursors);
75 cursors.push_back(data::kLAST_KEY);
76
77 std::vector<CursorPair> pairs;
78 pairs.reserve(cursors.size());
79
80 // FIXME: this should be `cursors | vs::pairwise` (C++23)
81 std::transform(
82 std::begin(cursors),
83 std::prev(std::end(cursors)),
84 std::next(std::begin(cursors)),
85 std::back_inserter(pairs),
86 [](auto&& a, auto&& b) -> CursorPair { return {a, b}; }
87 );
88
89 return pairs;
90 }
91};
92
93} // namespace etl::impl
auto synchronousAndRetryOnTimeout(FnType &&func)
Synchronously execute the given function object and retry until no DatabaseTimeout is thrown.
Definition BackendInterface.hpp:117
Definition BaseCursorProvider.hpp:14
Definition BaseCursorProvider.hpp:9