xrpld
Loading...
Searching...
No Matches
libxrpl
nodestore
BatchWriter.cpp
1
#include <xrpl/nodestore/detail/BatchWriter.h>
2
3
#include <xrpl/beast/utility/instrumentation.h>
4
#include <xrpl/nodestore/NodeObject.h>
5
#include <xrpl/nodestore/Scheduler.h>
6
#include <xrpl/nodestore/Types.h>
7
8
#include <
algorithm
>
9
#include <
chrono
>
10
#include <
memory
>
11
#include <
mutex
>
12
#include <
vector
>
13
14
namespace
xrpl::NodeStore
{
15
16
BatchWriter::BatchWriter
(
Callback
& callback,
Scheduler
& scheduler)
17
:
callback_
(callback),
scheduler_
(scheduler)
18
{
19
writeSet_
.reserve(
kBatchWritePreallocationSize
);
20
}
21
22
BatchWriter::~BatchWriter
()
23
{
24
waitForWriting
();
25
}
26
27
void
28
BatchWriter::store
(
std::shared_ptr<NodeObject>
const
&
object
)
29
{
30
std::unique_lock
<
decltype
(
writeMutex_
)> sl(
writeMutex_
);
31
32
// If the batch has reached its limit, we wait
33
// until the batch writer is finished
34
while
(
writeSet_
.size() >=
kBatchWritePreallocationSize
)
35
writeCondition_
.wait(sl);
36
37
writeSet_
.push_back(
object
);
38
39
if
(!
writePending_
)
40
{
41
writePending_
=
true
;
42
43
scheduler_
.scheduleTask(*
this
);
44
}
45
}
46
47
int
48
BatchWriter::getWriteLoad
()
49
{
50
std::scoped_lock
const
sl(
writeMutex_
);
51
52
return
std::max
(
writeLoad_
,
static_cast<
int
>
(
writeSet_
.size()));
53
}
54
55
void
56
BatchWriter::performScheduledTask
()
57
{
58
writeBatch
();
59
}
60
61
void
62
BatchWriter::writeBatch
()
63
{
64
for
(;;)
65
{
66
std::vector<std::shared_ptr<NodeObject>
>
set
;
67
68
set
.reserve(
kBatchWritePreallocationSize
);
69
70
{
71
std::scoped_lock
const
sl(
writeMutex_
);
72
73
writeSet_
.swap(
set
);
74
XRPL_ASSERT(
75
writeSet_
.empty(),
"xrpl::NodeStore::BatchWriter::writeBatch : writes not set"
);
76
writeLoad_
=
set
.size();
77
78
if
(
set
.empty())
79
{
80
writePending_
=
false
;
81
writeCondition_
.notify_all();
82
83
// VFALCO NOTE Fix this function to not return from the middle
84
return
;
85
}
86
}
87
88
BatchWriteReport
report{};
89
report.
writeCount
=
set
.size();
90
auto
const
before =
std::chrono::steady_clock::now
();
91
92
callback_
.writeBatch(
set
);
93
94
report.
elapsed
=
std::chrono::duration_cast<std::chrono::milliseconds>
(
95
std::chrono::steady_clock::now
() - before);
96
97
scheduler_
.onBatchWrite(report);
98
}
99
}
100
101
void
102
BatchWriter::waitForWriting
()
103
{
104
std::unique_lock
<
decltype
(
writeMutex_
)> sl(
writeMutex_
);
105
106
while
(
writePending_
)
107
writeCondition_
.wait(sl);
108
}
109
110
}
// namespace xrpl::NodeStore
algorithm
chrono
xrpl::NodeStore::BatchWriter::store
void store(std::shared_ptr< NodeObject > const &object)
Store the object.
Definition
BatchWriter.cpp:28
xrpl::NodeStore::BatchWriter::performScheduledTask
void performScheduledTask() override
Performs the task.
Definition
BatchWriter.cpp:56
xrpl::NodeStore::BatchWriter::writeLoad_
int writeLoad_
Definition
BatchWriter.h:73
xrpl::NodeStore::BatchWriter::~BatchWriter
~BatchWriter() override
Destroy a batch writer.
Definition
BatchWriter.cpp:22
xrpl::NodeStore::BatchWriter::writeSet_
Batch writeSet_
Definition
BatchWriter.h:75
xrpl::NodeStore::BatchWriter::callback_
Callback & callback_
Definition
BatchWriter.h:69
xrpl::NodeStore::BatchWriter::writePending_
bool writePending_
Definition
BatchWriter.h:74
xrpl::NodeStore::BatchWriter::writeMutex_
LockType writeMutex_
Definition
BatchWriter.h:71
xrpl::NodeStore::BatchWriter::waitForWriting
void waitForWriting()
Definition
BatchWriter.cpp:102
xrpl::NodeStore::BatchWriter::writeBatch
void writeBatch()
Definition
BatchWriter.cpp:62
xrpl::NodeStore::BatchWriter::writeCondition_
CondvarType writeCondition_
Definition
BatchWriter.h:72
xrpl::NodeStore::BatchWriter::scheduler_
Scheduler & scheduler_
Definition
BatchWriter.h:70
xrpl::NodeStore::BatchWriter::getWriteLoad
int getWriteLoad()
Get an estimate of the amount of writing I/O pending.
Definition
BatchWriter.cpp:48
xrpl::NodeStore::BatchWriter::BatchWriter
BatchWriter(Callback &callback, Scheduler &scheduler)
Create a batch writer.
Definition
BatchWriter.cpp:16
xrpl::NodeStore::Scheduler
Scheduling for asynchronous backend activity.
Definition
include/xrpl/nodestore/Scheduler.h:41
std::chrono::duration_cast
T duration_cast(T... args)
std::max
T max(T... args)
memory
mutex
xrpl::NodeStore
Definition
ServiceRegistry.h:12
xrpl::NodeStore::kBatchWritePreallocationSize
static constexpr auto kBatchWritePreallocationSize
Definition
nodestore/Types.h:12
xrpl::set
bool set(T &target, std::string const &name, Section const §ion)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
Definition
BasicConfig.h:295
std::chrono::steady_clock::now
T now(T... args)
std::scoped_lock
std::shared_ptr
xrpl::NodeStore::BatchWriteReport
Contains information about a batch write operation.
Definition
include/xrpl/nodestore/Scheduler.h:25
xrpl::NodeStore::BatchWriteReport::elapsed
std::chrono::milliseconds elapsed
Definition
include/xrpl/nodestore/Scheduler.h:28
xrpl::NodeStore::BatchWriteReport::writeCount
int writeCount
Definition
include/xrpl/nodestore/Scheduler.h:29
xrpl::NodeStore::BatchWriter::Callback
This callback does the actual writing.
Definition
BatchWriter.h:25
std::unique_lock
vector
Generated by
1.16.1