xrpld
Loading...
Searching...
No Matches
Message.cpp
1#include <xrpld/overlay/Message.h>
2
3#include <xrpld/overlay/Compression.h>
4#include <xrpld/overlay/detail/TrafficCount.h>
5
6#include <xrpl/beast/utility/instrumentation.h>
7#include <xrpl/protocol/PublicKey.h>
8
9#include <google/protobuf/message.h>
10
11#include <xrpl.pb.h>
12
13#include <cstddef>
14#include <cstdint>
15#include <mutex>
16#include <optional>
17#include <vector>
18
19namespace xrpl {
20
22 ::google::protobuf::Message const& message,
23 protocol::MessageType type,
24 std::optional<PublicKey> const& validator)
25 : category_(static_cast<std::size_t>(TrafficCount::categorize(message, type, false)))
26 , validatorKey_(validator)
27{
28 using namespace xrpl::compression;
29
30 auto const messageBytes = messageSize(message);
31
32 XRPL_ASSERT(messageBytes, "xrpl::Message::Message : non-empty message input");
33
34 buffer_.resize(kHeaderBytes + messageBytes);
35
36 setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0);
37
38 if (messageBytes != 0)
39 message.SerializeToArray(buffer_.data() + kHeaderBytes, messageBytes);
40
41 XRPL_ASSERT(
42 getBufferSize() == totalSize(message),
43 "xrpl::Message::Message : message size matches the buffer");
44}
45
46// static
48Message::messageSize(::google::protobuf::Message const& message)
49{
50#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
51 return message.ByteSizeLong();
52#else
53 return message.ByteSize();
54#endif
55}
56
57// static
59Message::totalSize(::google::protobuf::Message const& message)
60{
61 return messageSize(message) + compression::kHeaderBytes;
62}
63
64void
66{
67 using namespace xrpl::compression;
68 auto const messageBytes = buffer_.size() - kHeaderBytes;
69
70 auto type = getType(buffer_.data());
71
72 bool const compressible = [&] {
73 if (messageBytes <= 70)
74 return false;
75
76 // NOLINTNEXTLINE(bugprone-switch-missing-default-case)
77 switch (type)
78 {
79 case protocol::mtMANIFESTS:
80 case protocol::mtENDPOINTS:
81 case protocol::mtTRANSACTION:
82 case protocol::mtGET_LEDGER:
83 case protocol::mtLEDGER_DATA:
84 case protocol::mtGET_OBJECTS:
85 case protocol::mtVALIDATOR_LIST:
86 case protocol::mtVALIDATOR_LIST_COLLECTION:
87 case protocol::mtREPLAY_DELTA_RESPONSE:
88 case protocol::mtTRANSACTIONS:
89 return true;
90 case protocol::mtPING:
91 case protocol::mtCLUSTER:
92 case protocol::mtPROPOSE_LEDGER:
93 case protocol::mtSTATUS_CHANGE:
94 case protocol::mtHAVE_SET:
95 case protocol::mtVALIDATION:
96 case protocol::mtPROOF_PATH_REQ:
97 case protocol::mtPROOF_PATH_RESPONSE:
98 case protocol::mtREPLAY_DELTA_REQ:
99 case protocol::mtHAVE_TRANSACTIONS:
100 break;
101 }
102 return false;
103 }();
104
105 if (compressible)
106 {
107 auto payload = static_cast<void const*>(buffer_.data() + kHeaderBytes);
108
109 auto compressedSize = xrpl::compression::compress(
110 payload,
111 messageBytes,
112 [&](std::size_t inSize) { // size of required compressed buffer
115 });
116
117 if (compressedSize < (messageBytes - (kHeaderBytesCompressed - kHeaderBytes)))
118 {
119 bufferCompressed_.resize(kHeaderBytesCompressed + compressedSize);
120 // NOLINTNEXTLINE(readability-suspicious-call-argument)
121 setHeader(bufferCompressed_.data(), compressedSize, type, Algorithm::LZ4, messageBytes);
122 }
123 else
124 {
125 bufferCompressed_.resize(0);
126 }
127 }
128}
129
165void
167 std::uint8_t* in,
168 std::uint32_t payloadBytes,
169 int type,
171 std::uint32_t uncompressedBytes)
172{
173 auto h = in;
174
175 auto pack = [](std::uint8_t*& in, std::uint32_t size) {
176 *in++ = static_cast<std::uint8_t>((size >> 24) & 0x0F); // leftmost 4 are compression bits
177 *in++ = static_cast<std::uint8_t>((size >> 16) & 0xFF);
178 *in++ = static_cast<std::uint8_t>((size >> 8) & 0xFF);
179 *in++ = static_cast<std::uint8_t>(size & 0xFF);
180 };
181
182 pack(in, payloadBytes);
183
184 *in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
185 *in++ = static_cast<std::uint8_t>(type & 0xFF);
186
188 {
189 pack(in, uncompressedBytes);
190 *h |= static_cast<std::uint8_t>(compression);
191 }
192}
193
196{
197 return buffer_.size();
198}
199
202{
203 if (tryCompressed == Compressed::Off)
204 return buffer_;
205
207
208 if (!bufferCompressed_.empty())
209 {
210 return bufferCompressed_;
211 }
212
213 return buffer_;
214}
215
216int
218{
219 int const type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
220 return type;
221}
222
223} // namespace xrpl
T call_once(T... args)
compression::Compressed Compressed
Definition Message.h:31
std::once_flag onceFlag_
Definition Message.h:83
std::optional< PublicKey > validatorKey_
Definition Message.h:84
Message(::google::protobuf::Message const &message, protocol::MessageType type, std::optional< PublicKey > const &validator={})
Constructor.
Definition Message.cpp:21
std::size_t category_
Definition Message.h:82
compression::Algorithm Algorithm
Definition Message.h:32
static int getType(std::uint8_t const *in)
Get the message type from the payload header.
Definition Message.cpp:217
static std::size_t totalSize(::google::protobuf::Message const &message)
Definition Message.cpp:59
std::vector< uint8_t > const & getBuffer(Compressed tryCompressed)
Retrieve the packed message data.
Definition Message.cpp:201
std::vector< uint8_t > buffer_
Definition Message.h:80
static void setHeader(std::uint8_t *in, std::uint32_t payloadBytes, int type, Algorithm compression, std::uint32_t uncompressedBytes)
Set the payload header.
Definition Message.cpp:166
void compress()
Try to compress the payload.
Definition Message.cpp:65
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:48
std::size_t getBufferSize()
Retrieve the size of the packed but uncompressed message data.
Definition Message.cpp:195
std::vector< uint8_t > bufferCompressed_
Definition Message.h:81
TrafficCount is used to count ingress and egress wire bytes and number of messages.
STL namespace.
std::size_t compress(void const *in, std::size_t inSize, BufferFactory &&bf, Algorithm algorithm=Algorithm::LZ4)
Compress input data.
Definition Compression.h:67
constexpr std::size_t kHeaderBytesCompressed
Definition Compression.h:9
constexpr std::size_t kHeaderBytes
Definition Compression.h:8
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5