rippled
Loading...
Searching...
No Matches
Message.cpp
1#include <xrpld/overlay/Message.h>
2#include <xrpld/overlay/detail/TrafficCount.h>
3
4#include <cstdint>
5
6namespace xrpl {
7
9 ::google::protobuf::Message const& message,
10 protocol::MessageType type,
11 std::optional<PublicKey> const& validator)
12 : category_(TrafficCount::categorize(message, type, false)), validatorKey_(validator)
13{
14 using namespace xrpl::compression;
15
16 auto const messageBytes = messageSize(message);
17
18 XRPL_ASSERT(messageBytes, "xrpl::Message::Message : non-empty message input");
19
20 buffer_.resize(headerBytes + messageBytes);
21
22 setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0);
23
24 if (messageBytes != 0)
25 message.SerializeToArray(buffer_.data() + headerBytes, messageBytes);
26
27 XRPL_ASSERT(
28 getBufferSize() == totalSize(message),
29 "xrpl::Message::Message : message size matches the buffer");
30}
31
32// static
34Message::messageSize(::google::protobuf::Message const& message)
35{
36#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
37 return message.ByteSizeLong();
38#else
39 return message.ByteSize();
40#endif
41}
42
43// static
45Message::totalSize(::google::protobuf::Message const& message)
46{
47 return messageSize(message) + compression::headerBytes;
48}
49
50void
52{
53 using namespace xrpl::compression;
54 auto const messageBytes = buffer_.size() - headerBytes;
55
56 auto type = getType(buffer_.data());
57
58 bool const compressible = [&] {
59 if (messageBytes <= 70)
60 return false;
61
62 // NOLINTNEXTLINE(bugprone-switch-missing-default-case)
63 switch (type)
64 {
65 case protocol::mtMANIFESTS:
66 case protocol::mtENDPOINTS:
67 case protocol::mtTRANSACTION:
68 case protocol::mtGET_LEDGER:
69 case protocol::mtLEDGER_DATA:
70 case protocol::mtGET_OBJECTS:
71 case protocol::mtVALIDATOR_LIST:
72 case protocol::mtVALIDATOR_LIST_COLLECTION:
73 case protocol::mtREPLAY_DELTA_RESPONSE:
74 case protocol::mtTRANSACTIONS:
75 return true;
76 case protocol::mtPING:
77 case protocol::mtCLUSTER:
78 case protocol::mtPROPOSE_LEDGER:
79 case protocol::mtSTATUS_CHANGE:
80 case protocol::mtHAVE_SET:
81 case protocol::mtVALIDATION:
82 case protocol::mtPROOF_PATH_REQ:
83 case protocol::mtPROOF_PATH_RESPONSE:
84 case protocol::mtREPLAY_DELTA_REQ:
85 case protocol::mtHAVE_TRANSACTIONS:
86 break;
87 }
88 return false;
89 }();
90
91 if (compressible)
92 {
93 auto payload = static_cast<void const*>(buffer_.data() + headerBytes);
94
95 auto compressedSize = xrpl::compression::compress(
96 payload,
97 messageBytes,
98 [&](std::size_t inSize) { // size of required compressed buffer
99 bufferCompressed_.resize(inSize + headerBytesCompressed);
100 return (bufferCompressed_.data() + headerBytesCompressed);
101 });
102
103 if (compressedSize < (messageBytes - (headerBytesCompressed - headerBytes)))
104 {
105 bufferCompressed_.resize(headerBytesCompressed + compressedSize);
106 // NOLINTNEXTLINE(readability-suspicious-call-argument)
107 setHeader(bufferCompressed_.data(), compressedSize, type, Algorithm::LZ4, messageBytes);
108 }
109 else
110 {
112 }
113 }
114}
115
151void
154 std::uint32_t payloadBytes,
155 int type,
156 Algorithm compression,
157 std::uint32_t uncompressedBytes)
158{
159 auto h = in;
160
161 auto pack = [](std::uint8_t*& in, std::uint32_t size) {
162 *in++ = static_cast<std::uint8_t>((size >> 24) & 0x0F); // leftmost 4 are compression bits
163 *in++ = static_cast<std::uint8_t>((size >> 16) & 0xFF);
164 *in++ = static_cast<std::uint8_t>((size >> 8) & 0xFF);
165 *in++ = static_cast<std::uint8_t>(size & 0xFF);
166 };
167
168 pack(in, payloadBytes);
169
170 *in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
171 *in++ = static_cast<std::uint8_t>(type & 0xFF);
172
173 if (compression != Algorithm::None)
174 {
175 pack(in, uncompressedBytes);
176 *h |= static_cast<std::uint8_t>(compression);
177 }
178}
179
182{
183 return buffer_.size();
184}
185
188{
189 if (tryCompressed == Compressed::Off)
190 return buffer_;
191
193
195 {
196 return bufferCompressed_;
197 }
198
199 return buffer_;
200}
201
202int
204{
205 int const type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
206 return type;
207}
208
209} // namespace xrpl
T call_once(T... args)
Message(::google::protobuf::Message const &message, protocol::MessageType type, std::optional< PublicKey > const &validator={})
Constructor.
Definition Message.cpp:8
static int getType(std::uint8_t const *in)
Get the message type from the payload header.
Definition Message.cpp:203
std::once_flag once_flag_
Definition Message.h:83
static std::size_t totalSize(::google::protobuf::Message const &message)
Definition Message.cpp:45
std::vector< uint8_t > const & getBuffer(Compressed tryCompressed)
Retrieve the packed message data.
Definition Message.cpp:187
std::vector< uint8_t > buffer_
Definition Message.h:80
static void setHeader(std::uint8_t *in, std::uint32_t payloadBytes, int type, Algorithm compression, std::uint32_t uncompressedBytes)
Set the payload header.
Definition Message.cpp:152
void compress()
Try to compress the payload.
Definition Message.cpp:51
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:34
std::size_t getBufferSize()
Retrieve the size of the packed but uncompressed message data.
Definition Message.cpp:181
std::vector< uint8_t > bufferCompressed_
Definition Message.h:81
TrafficCount is used to count ingress and egress wire bytes and number of messages.
T data(T... args)
T empty(T... args)
std::size_t compress(void const *in, std::size_t inSize, BufferFactory &&bf, Algorithm algorithm=Algorithm::LZ4)
Compress input data.
Definition Compression.h:69
std::size_t constexpr headerBytes
Definition Compression.h:10
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T resize(T... args)
T size(T... args)