Clio develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
ExtractionDataPipe.hpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of clio: https://github.com/XRPLF/clio
4 Copyright (c) 2023, the clio developers.
5
6 Permission to use, copy, modify, and distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#pragma once
21
22#include "etl/ETLHelpers.hpp"
23#include "util/log/Logger.hpp"
24
25#include <cstddef>
26#include <cstdint>
27#include <memory>
28#include <optional>
29#include <vector>
30
31namespace etl::impl {
32
36template <typename RawDataType>
38public:
39 using DataType = std::optional<RawDataType>;
40 using QueueType = ThreadSafeQueue<DataType>; // TODO: probably should use boost::lockfree::queue instead?
41
42 static constexpr auto kTOTAL_MAX_IN_QUEUE = 1000u;
43
44private:
45 util::Logger log_{"ETL"};
46
47 uint32_t stride_;
48 uint32_t startSequence_;
49
50 std::vector<std::shared_ptr<QueueType>> queues_;
51
52public:
59 ExtractionDataPipe(uint32_t stride, uint32_t startSequence) : stride_{stride}, startSequence_{startSequence}
60 {
61 auto const maxQueueSize = kTOTAL_MAX_IN_QUEUE / stride;
62 for (size_t i = 0; i < stride_; ++i)
63 queues_.push_back(std::make_unique<QueueType>(maxQueueSize));
64 }
65
74 void
75 push(uint32_t sequence, DataType&& data)
76 {
77 getQueue(sequence)->push(std::move(data));
78 }
79
88 DataType
89 popNext(uint32_t sequence)
90 {
91 return getQueue(sequence)->pop();
92 }
93
97 uint32_t
98 getStride() const
99 {
100 return stride_;
101 }
102
107 void
108 finish(uint32_t sequence)
109 {
110 // empty optional hints the Transformer to shut down
111 push(sequence, std::nullopt);
112 }
113
119 void
121 {
122 // TODO: this should not have to be called by hand. it should be done via RAII
123 for (auto i = 0u; i < stride_; ++i)
124 getQueue(i)->tryPop(); // pop from each queue that might be blocked on a push
125 }
126
127private:
128 std::shared_ptr<QueueType>
129 getQueue(uint32_t sequence)
130 {
131 LOG(log_.debug()) << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_;
132 return queues_[(sequence - startSequence_) % stride_];
133 }
134};
135
136} // namespace etl::impl
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
A collection of thread safe async queues used by Extractor and Transformer to communicate.
Definition ExtractionDataPipe.hpp:37
void cleanup()
Unblocks internal queues.
Definition ExtractionDataPipe.hpp:120
ExtractionDataPipe(uint32_t stride, uint32_t startSequence)
Create a new instance of the extraction data pipe.
Definition ExtractionDataPipe.hpp:59
void finish(uint32_t sequence)
Hint the Transformer that the queue is done sending data.
Definition ExtractionDataPipe.hpp:108
void push(uint32_t sequence, DataType &&data)
Push new data package for the specified sequence.
Definition ExtractionDataPipe.hpp:75
uint32_t getStride() const
Definition ExtractionDataPipe.hpp:98
DataType popNext(uint32_t sequence)
Get data package for the given sequence.
Definition ExtractionDataPipe.hpp:89
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70