rippled
Loading...
Searching...
No Matches
Message.cpp
1#include <xrpld/overlay/Message.h>
2#include <xrpld/overlay/detail/TrafficCount.h>
3
4#include <cstdint>
5
6namespace ripple {
7
9 ::google::protobuf::Message const& message,
10 protocol::MessageType type,
11 std::optional<PublicKey> const& validator)
12 : category_(TrafficCount::categorize(message, type, false))
13 , validatorKey_(validator)
14{
15 using namespace ripple::compression;
16
17 auto const messageBytes = messageSize(message);
18
19 XRPL_ASSERT(
20 messageBytes, "ripple::Message::Message : non-empty message input");
21
22 buffer_.resize(headerBytes + messageBytes);
23
24 setHeader(buffer_.data(), messageBytes, type, Algorithm::None, 0);
25
26 if (messageBytes != 0)
27 message.SerializeToArray(buffer_.data() + headerBytes, messageBytes);
28
29 XRPL_ASSERT(
30 getBufferSize() == totalSize(message),
31 "ripple::Message::Message : message size matches the buffer");
32}
33
34// static
36Message::messageSize(::google::protobuf::Message const& message)
37{
38#if defined(GOOGLE_PROTOBUF_VERSION) && (GOOGLE_PROTOBUF_VERSION >= 3011000)
39 return message.ByteSizeLong();
40#else
41 return message.ByteSize();
42#endif
43}
44
45// static
47Message::totalSize(::google::protobuf::Message const& message)
48{
49 return messageSize(message) + compression::headerBytes;
50}
51
52void
54{
55 using namespace ripple::compression;
56 auto const messageBytes = buffer_.size() - headerBytes;
57
58 auto type = getType(buffer_.data());
59
60 bool const compressible = [&] {
61 if (messageBytes <= 70)
62 return false;
63 switch (type)
64 {
65 case protocol::mtMANIFESTS:
66 case protocol::mtENDPOINTS:
67 case protocol::mtTRANSACTION:
68 case protocol::mtGET_LEDGER:
69 case protocol::mtLEDGER_DATA:
70 case protocol::mtGET_OBJECTS:
71 case protocol::mtVALIDATORLIST:
72 case protocol::mtVALIDATORLISTCOLLECTION:
73 case protocol::mtREPLAY_DELTA_RESPONSE:
74 case protocol::mtTRANSACTIONS:
75 return true;
76 case protocol::mtPING:
77 case protocol::mtCLUSTER:
78 case protocol::mtPROPOSE_LEDGER:
79 case protocol::mtSTATUS_CHANGE:
80 case protocol::mtHAVE_SET:
81 case protocol::mtVALIDATION:
82 case protocol::mtPROOF_PATH_REQ:
83 case protocol::mtPROOF_PATH_RESPONSE:
84 case protocol::mtREPLAY_DELTA_REQ:
85 case protocol::mtHAVE_TRANSACTIONS:
86 break;
87 }
88 return false;
89 }();
90
91 if (compressible)
92 {
93 auto payload = static_cast<void const*>(buffer_.data() + headerBytes);
94
95 auto compressedSize = ripple::compression::compress(
96 payload,
97 messageBytes,
98 [&](std::size_t inSize) { // size of required compressed buffer
99 bufferCompressed_.resize(inSize + headerBytesCompressed);
100 return (bufferCompressed_.data() + headerBytesCompressed);
101 });
102
103 if (compressedSize <
104 (messageBytes - (headerBytesCompressed - headerBytes)))
105 {
106 bufferCompressed_.resize(headerBytesCompressed + compressedSize);
107 setHeader(
109 compressedSize,
110 type,
111 Algorithm::LZ4,
112 messageBytes);
113 }
114 else
116 }
117}
118
154void
157 std::uint32_t payloadBytes,
158 int type,
159 Algorithm compression,
160 std::uint32_t uncompressedBytes)
161{
162 auto h = in;
163
164 auto pack = [](std::uint8_t*& in, std::uint32_t size) {
165 *in++ = static_cast<std::uint8_t>(
166 (size >> 24) & 0x0F); // leftmost 4 are compression bits
167 *in++ = static_cast<std::uint8_t>((size >> 16) & 0xFF);
168 *in++ = static_cast<std::uint8_t>((size >> 8) & 0xFF);
169 *in++ = static_cast<std::uint8_t>(size & 0xFF);
170 };
171
172 pack(in, payloadBytes);
173
174 *in++ = static_cast<std::uint8_t>((type >> 8) & 0xFF);
175 *in++ = static_cast<std::uint8_t>(type & 0xFF);
176
177 if (compression != Algorithm::None)
178 {
179 pack(in, uncompressedBytes);
180 *h |= static_cast<std::uint8_t>(compression);
181 }
182}
183
186{
187 return buffer_.size();
188}
189
192{
193 if (tryCompressed == Compressed::Off)
194 return buffer_;
195
197
198 if (bufferCompressed_.size() > 0)
199 return bufferCompressed_;
200 else
201 return buffer_;
202}
203
204int
206{
207 int type = (static_cast<int>(*(in + 4)) << 8) + *(in + 5);
208 return type;
209}
210
211} // namespace ripple
T call_once(T... args)
std::once_flag once_flag_
Definition Message.h:84
void setHeader(std::uint8_t *in, std::uint32_t payloadBytes, int type, Algorithm compression, std::uint32_t uncompressedBytes)
Set the payload header.
Definition Message.cpp:155
std::vector< uint8_t > const & getBuffer(Compressed tryCompressed)
Retrieve the packed message data.
Definition Message.cpp:191
std::size_t getBufferSize()
Retrieve the size of the packed but uncompressed message data.
Definition Message.cpp:185
static std::size_t totalSize(::google::protobuf::Message const &message)
Definition Message.cpp:47
Message(::google::protobuf::Message const &message, protocol::MessageType type, std::optional< PublicKey > const &validator={})
Constructor.
Definition Message.cpp:8
std::vector< uint8_t > buffer_
Definition Message.h:81
std::vector< uint8_t > bufferCompressed_
Definition Message.h:82
int getType(std::uint8_t const *in) const
Get the message type from the payload header.
Definition Message.cpp:205
void compress()
Try to compress the payload.
Definition Message.cpp:53
static std::size_t messageSize(::google::protobuf::Message const &message)
Definition Message.cpp:36
TrafficCount is used to count ingress and egress wire bytes and number of messages.
T data(T... args)
std::size_t constexpr headerBytes
Definition Compression.h:11
std::size_t compress(void const *in, std::size_t inSize, BufferFactory &&bf, Algorithm algorithm=Algorithm::LZ4)
Compress input data.
Definition Compression.h:71
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T resize(T... args)
T size(T... args)