23#include "util/log/Logger.hpp"
36template <
typename RawDataType>
39 using DataType = std::optional<RawDataType>;
42 static constexpr auto kTOTAL_MAX_IN_QUEUE = 1000u;
48 uint32_t startSequence_;
50 std::vector<std::shared_ptr<QueueType>> queues_;
59 ExtractionDataPipe(uint32_t stride, uint32_t startSequence) : stride_{stride}, startSequence_{startSequence}
61 auto const maxQueueSize = kTOTAL_MAX_IN_QUEUE / stride;
62 for (
size_t i = 0; i < stride_; ++i)
63 queues_.push_back(std::make_unique<QueueType>(maxQueueSize));
77 getQueue(sequence)->push(std::move(
data));
91 return getQueue(sequence)->pop();
111 push(sequence, std::nullopt);
123 for (
auto i = 0u; i < stride_; ++i)
124 getQueue(i)->tryPop();
128 std::shared_ptr<QueueType>
129 getQueue(uint32_t sequence)
131 LOG(log_.
debug()) <<
"Grabbing extraction queue for " << sequence <<
"; start was " << startSequence_;
132 return queues_[(sequence - startSequence_) % stride_];
Generic thread-safe queue with a max capacity.
Definition ETLHelpers.hpp:44
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:111
Pump debug(SourceLocationType const &loc=CURRENT_SRC_LOCATION) const
Interface for logging at Severity::DBG severity.
Definition Logger.cpp:214
This namespace implements the data access layer and related components.
Definition AmendmentCenter.cpp:70