Clio  develop
The XRP Ledger API server.
Loading...
Searching...
No Matches
TaskManager.hpp
1#pragma once
2
3#include "etl/ExtractorInterface.hpp"
4#include "etl/LoaderInterface.hpp"
5#include "etl/MonitorInterface.hpp"
6#include "etl/SchedulerInterface.hpp"
7#include "etl/TaskManagerInterface.hpp"
8#include "etl/impl/Monitor.hpp"
9#include "etl/impl/TaskQueue.hpp"
10#include "util/async/AnyExecutionContext.hpp"
11#include "util/async/AnyOperation.hpp"
12#include "util/log/Logger.hpp"
13
14#include <xrpl/protocol/TxFormats.h>
15
16#include <atomic>
17#include <cstddef>
18#include <cstdint>
19#include <functional>
20#include <memory>
21#include <vector>
22
23namespace etl::impl {
24
25class TaskManager : public TaskManagerInterface {
26 static constexpr auto kQUEUE_SIZE_LIMIT = 2048uz;
27
29 std::shared_ptr<SchedulerInterface> schedulers_;
30 std::reference_wrapper<ExtractorInterface> extractor_;
31 std::reference_wrapper<LoaderInterface> loader_;
32 std::reference_wrapper<MonitorInterface> monitor_;
33
34 impl::TaskQueue queue_;
35 std::atomic_uint32_t nextForwardSequence_;
36
37 std::vector<util::async::AnyOperation<void>> extractors_;
38 std::vector<util::async::AnyOperation<void>> loaders_;
39
40 std::atomic_bool running_ = false;
41 util::Logger log_{"ETL"};
42
43public:
44 TaskManager(
46 std::shared_ptr<SchedulerInterface> scheduler,
47 std::reference_wrapper<ExtractorInterface> extractor,
48 std::reference_wrapper<LoaderInterface> loader,
49 std::reference_wrapper<MonitorInterface> monitor,
50 uint32_t startSeq
51 );
52
53 ~TaskManager() override;
54
55 TaskManager(TaskManager const&) = delete;
56 TaskManager(TaskManager&&) = delete;
57 TaskManager&
58 operator=(TaskManager const&) = delete;
59 TaskManager&
60 operator=(TaskManager&&) = delete;
61
62 void
63 run(std::size_t numExtractors) override;
64
65 void
66 stop() override;
67
68private:
69 void
70 wait();
71
73 spawnExtractor(TaskQueue& queue);
74
76 spawnLoader(TaskQueue& queue);
77};
78
79} // namespace etl::impl
void stop() override
Stop the task manager.
Definition TaskManager.cpp:167
A wrapper for std::priority_queue that serialises operations using a mutex.
Definition TaskQueue.hpp:32
A simple thread-safe logger for the channel specified in the constructor.
Definition Logger.hpp:77
A type-erased execution context.
Definition AnyExecutionContext.hpp:22
A type-erased operation that can be executed via AnyExecutionContext.
Definition AnyOperation.hpp:25
An interface for the Task Manager.
Definition TaskManagerInterface.hpp:10