rippled
Loading...
Searching...
No Matches
ProtocolMessage.h
1#pragma once
2
3#include <xrpld/overlay/Compression.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/detail/ZeroCopyStream.h>
6
7#include <xrpl/beast/utility/instrumentation.h>
8#include <xrpl/protocol/messages.h>
9
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/buffers_iterator.hpp>
12
13#include <cstdint>
14#include <optional>
15#include <type_traits>
16#include <vector>
17
18namespace xrpl {
19
20inline protocol::MessageType
21protocolMessageType(protocol::TMGetLedger const&)
22{
23 return protocol::mtGET_LEDGER;
24}
25
26inline protocol::MessageType
27protocolMessageType(protocol::TMReplayDeltaRequest const&)
28{
29 return protocol::mtREPLAY_DELTA_REQ;
30}
31
32inline protocol::MessageType
33protocolMessageType(protocol::TMProofPathRequest const&)
34{
35 return protocol::mtPROOF_PATH_REQ;
36}
37
39template <class = void>
42{
43 switch (type)
44 {
45 case protocol::mtMANIFESTS:
46 return "manifests";
47 case protocol::mtPING:
48 return "ping";
49 case protocol::mtCLUSTER:
50 return "cluster";
51 case protocol::mtENDPOINTS:
52 return "endpoints";
53 case protocol::mtTRANSACTION:
54 return "tx";
55 case protocol::mtGET_LEDGER:
56 return "get_ledger";
57 case protocol::mtLEDGER_DATA:
58 return "ledger_data";
59 case protocol::mtPROPOSE_LEDGER:
60 return "propose";
61 case protocol::mtSTATUS_CHANGE:
62 return "status";
63 case protocol::mtHAVE_SET:
64 return "have_set";
65 case protocol::mtVALIDATOR_LIST:
66 return "validator_list";
67 case protocol::mtVALIDATOR_LIST_COLLECTION:
68 return "validator_list_collection";
69 case protocol::mtVALIDATION:
70 return "validation";
71 case protocol::mtGET_OBJECTS:
72 return "get_objects";
73 case protocol::mtHAVE_TRANSACTIONS:
74 return "have_transactions";
75 case protocol::mtTRANSACTIONS:
76 return "transactions";
77 case protocol::mtSQUELCH:
78 return "squelch";
79 case protocol::mtPROOF_PATH_REQ:
80 return "proof_path_request";
81 case protocol::mtPROOF_PATH_RESPONSE:
82 return "proof_path_response";
83 case protocol::mtREPLAY_DELTA_REQ:
84 return "replay_delta_request";
85 case protocol::mtREPLAY_DELTA_RESPONSE:
86 return "replay_delta_response";
87 default:
88 break;
89 }
90 return "unknown";
91}
92
93namespace detail {
94
121
122template <typename BufferSequence>
123auto
124buffersBegin(BufferSequence const& bufs)
125{
126 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs);
127}
128
129template <typename BufferSequence>
130auto
131buffersEnd(BufferSequence const& bufs)
132{
133 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(bufs);
134}
135
145template <class BufferSequence>
147parseMessageHeader(boost::system::error_code& ec, BufferSequence const& bufs, std::size_t size)
148{
149 using namespace xrpl::compression;
150
151 MessageHeader hdr;
152 auto iter = buffersBegin(bufs);
153 XRPL_ASSERT(iter != buffersEnd(bufs), "xrpl::detail::parseMessageHeader : non-empty buffer");
154
155 // Check valid header compressed message:
156 // - 4 bits are the compression algorithm, 1st bit is always set to 1
157 // - 2 bits are always set to 0
158 // - 26 bits are the payload size
159 // - 32 bits are the uncompressed data size
160 if (*iter & 0x80)
161 {
162 hdr.header_size = headerBytesCompressed;
163
164 // not enough bytes to parse the header
165 if (size < hdr.header_size)
166 {
167 ec = make_error_code(boost::system::errc::success);
168 return std::nullopt;
169 }
170
171 if (*iter & 0x0C)
172 {
173 ec = make_error_code(boost::system::errc::protocol_error);
174 return std::nullopt;
175 }
176
177 hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
178
180 {
181 ec = make_error_code(boost::system::errc::protocol_error);
182 return std::nullopt;
183 }
184
185 for (int i = 0; i != 4; ++i)
186 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
187
188 // clear the top four bits (the compression bits).
189 hdr.payload_wire_size &= 0x0FFFFFFF;
190
192
193 for (int i = 0; i != 2; ++i)
194 hdr.message_type = (hdr.message_type << 8) + *iter++;
195
196 for (int i = 0; i != 4; ++i)
197 hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
198
199 return hdr;
200 }
201
202 // Check valid header uncompressed message:
203 // - 6 bits are set to 0
204 // - 26 bits are the payload size
205 if ((*iter & 0xFC) == 0)
206 {
207 hdr.header_size = headerBytes;
208
209 if (size < hdr.header_size)
210 {
211 ec = make_error_code(boost::system::errc::success);
212 return std::nullopt;
213 }
214
215 hdr.algorithm = Algorithm::None;
216
217 for (int i = 0; i != 4; ++i)
218 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
219
222
223 for (int i = 0; i != 2; ++i)
224 hdr.message_type = (hdr.message_type << 8) + *iter++;
225
226 return hdr;
227 }
228
229 ec = make_error_code(boost::system::errc::no_message);
230 return std::nullopt;
231}
232
233template <class T, class Buffers, class = std::enable_if_t<std::is_base_of<::google::protobuf::Message, T>::value>>
235parseMessageContent(MessageHeader const& header, Buffers const& buffers)
236{
237 auto const m = std::make_shared<T>();
238
239 ZeroCopyInputStream<Buffers> stream(buffers);
240 stream.Skip(header.header_size);
241
243 {
245 payload.resize(header.uncompressed_size);
246
247 auto const payloadSize = xrpl::compression::decompress(
248 stream, header.payload_wire_size, payload.data(), header.uncompressed_size, header.algorithm);
249
250 if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
251 return {};
252 }
253 else if (!m->ParseFromZeroCopyStream(&stream))
254 return {};
255
256 return m;
257}
258
259template <
260 class T,
261 class Buffers,
262 class Handler,
264bool
265invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
266{
267 auto const m = parseMessageContent<T>(header, buffers);
268 if (!m)
269 return false;
270
271 using namespace xrpl::compression;
272 handler.onMessageBegin(
273 header.message_type,
274 m,
275 header.payload_wire_size,
276 header.uncompressed_size,
277 header.algorithm != Algorithm::None);
278 handler.onMessage(m);
279 handler.onMessageEnd(header.message_type, m);
280
281 return true;
282}
283
284} // namespace detail
285
298template <class Buffers, class Handler>
300invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hint)
301{
303
304 auto const size = boost::asio::buffer_size(buffers);
305
306 if (size == 0)
307 return result;
308
309 auto header = detail::parseMessageHeader(result.second, buffers, size);
310
311 // If we can't parse the header then it may be that we don't have enough
312 // bytes yet, or because the message was cut off (if error_code is success).
313 // Otherwise we failed to match the header's marker (error_code is set to
314 // no_message) or the compression algorithm is invalid (error_code is
315 // protocol_error) and signal an error.
316 if (!header)
317 return result;
318
319 // We implement a maximum size for protocol messages. Sending a message
320 // whose size exceeds this may result in the connection being dropped. A
321 // larger message size may be supported in the future or negotiated as
322 // part of a protocol upgrade.
323 if (header->payload_wire_size > maximumMessageSize || header->uncompressed_size > maximumMessageSize)
324 {
325 result.second = make_error_code(boost::system::errc::message_size);
326 return result;
327 }
328
329 // We requested uncompressed messages from the peer but received compressed.
330 if (!handler.compressionEnabled() && header->algorithm != compression::Algorithm::None)
331 {
332 result.second = make_error_code(boost::system::errc::protocol_error);
333 return result;
334 }
335
336 // We don't have the whole message yet. This isn't an error but we have
337 // nothing to do.
338 if (header->total_wire_size > size)
339 {
340 hint = header->total_wire_size - size;
341 return result;
342 }
343
344 bool success;
345
346 switch (header->message_type)
347 {
348 case protocol::mtMANIFESTS:
349 success = detail::invoke<protocol::TMManifests>(*header, buffers, handler);
350 break;
351 case protocol::mtPING:
352 success = detail::invoke<protocol::TMPing>(*header, buffers, handler);
353 break;
354 case protocol::mtCLUSTER:
355 success = detail::invoke<protocol::TMCluster>(*header, buffers, handler);
356 break;
357 case protocol::mtENDPOINTS:
358 success = detail::invoke<protocol::TMEndpoints>(*header, buffers, handler);
359 break;
360 case protocol::mtTRANSACTION:
361 success = detail::invoke<protocol::TMTransaction>(*header, buffers, handler);
362 break;
363 case protocol::mtGET_LEDGER:
364 success = detail::invoke<protocol::TMGetLedger>(*header, buffers, handler);
365 break;
366 case protocol::mtLEDGER_DATA:
367 success = detail::invoke<protocol::TMLedgerData>(*header, buffers, handler);
368 break;
369 case protocol::mtPROPOSE_LEDGER:
370 success = detail::invoke<protocol::TMProposeSet>(*header, buffers, handler);
371 break;
372 case protocol::mtSTATUS_CHANGE:
373 success = detail::invoke<protocol::TMStatusChange>(*header, buffers, handler);
374 break;
375 case protocol::mtHAVE_SET:
376 success = detail::invoke<protocol::TMHaveTransactionSet>(*header, buffers, handler);
377 break;
378 case protocol::mtVALIDATION:
379 success = detail::invoke<protocol::TMValidation>(*header, buffers, handler);
380 break;
381 case protocol::mtVALIDATOR_LIST:
382 success = detail::invoke<protocol::TMValidatorList>(*header, buffers, handler);
383 break;
384 case protocol::mtVALIDATOR_LIST_COLLECTION:
385 success = detail::invoke<protocol::TMValidatorListCollection>(*header, buffers, handler);
386 break;
387 case protocol::mtGET_OBJECTS:
388 success = detail::invoke<protocol::TMGetObjectByHash>(*header, buffers, handler);
389 break;
390 case protocol::mtHAVE_TRANSACTIONS:
391 success = detail::invoke<protocol::TMHaveTransactions>(*header, buffers, handler);
392 break;
393 case protocol::mtTRANSACTIONS:
394 success = detail::invoke<protocol::TMTransactions>(*header, buffers, handler);
395 break;
396 case protocol::mtSQUELCH:
397 success = detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
398 break;
399 case protocol::mtPROOF_PATH_REQ:
400 success = detail::invoke<protocol::TMProofPathRequest>(*header, buffers, handler);
401 break;
402 case protocol::mtPROOF_PATH_RESPONSE:
403 success = detail::invoke<protocol::TMProofPathResponse>(*header, buffers, handler);
404 break;
405 case protocol::mtREPLAY_DELTA_REQ:
406 success = detail::invoke<protocol::TMReplayDeltaRequest>(*header, buffers, handler);
407 break;
408 case protocol::mtREPLAY_DELTA_RESPONSE:
409 success = detail::invoke<protocol::TMReplayDeltaResponse>(*header, buffers, handler);
410 break;
411 default:
412 handler.onMessageUnknown(header->message_type);
413 success = true;
414 break;
415 }
416
417 result.first = header->total_wire_size;
418
419 if (!success)
420 result.second = make_error_code(boost::system::errc::bad_message);
421
422 return result;
423}
424
425} // namespace xrpl
Implements ZeroCopyInputStream around a buffer sequence.
T data(T... args)
T is_same_v
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition Compression.h:29
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
auto buffersBegin(BufferSequence const &bufs)
auto buffersEnd(BufferSequence const &bufs)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::error_code make_error_code(xrpl::TokenCodecErrc e)
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
constexpr std::size_t maximumMessageSize
Definition Message.h:14
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
T resize(T... args)
std::uint32_t header_size
The size of the header associated with this message.
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.
std::uint32_t total_wire_size
The size of the message on the wire.
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
std::uint32_t payload_wire_size
The size of the payload on the wire.
std::uint16_t message_type
The type of the message.