rippled
Loading...
Searching...
No Matches
ProtocolMessage.h
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#ifndef RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
21#define RIPPLE_OVERLAY_PROTOCOLMESSAGE_H_INCLUDED
22
23#include <xrpld/overlay/Compression.h>
24#include <xrpld/overlay/Message.h>
25#include <xrpld/overlay/detail/ZeroCopyStream.h>
26
27#include <xrpl/beast/utility/instrumentation.h>
28#include <xrpl/protocol/messages.h>
29
30#include <boost/asio/buffer.hpp>
31#include <boost/asio/buffers_iterator.hpp>
32
33#include <cstdint>
34#include <optional>
35#include <type_traits>
36#include <vector>
37
38namespace ripple {
39
40inline protocol::MessageType
41protocolMessageType(protocol::TMGetLedger const&)
42{
43 return protocol::mtGET_LEDGER;
44}
45
46inline protocol::MessageType
47protocolMessageType(protocol::TMReplayDeltaRequest const&)
48{
49 return protocol::mtREPLAY_DELTA_REQ;
50}
51
52inline protocol::MessageType
53protocolMessageType(protocol::TMProofPathRequest const&)
54{
55 return protocol::mtPROOF_PATH_REQ;
56}
57
59template <class = void>
62{
63 switch (type)
64 {
65 case protocol::mtMANIFESTS:
66 return "manifests";
67 case protocol::mtPING:
68 return "ping";
69 case protocol::mtCLUSTER:
70 return "cluster";
71 case protocol::mtENDPOINTS:
72 return "endpoints";
73 case protocol::mtTRANSACTION:
74 return "tx";
75 case protocol::mtGET_LEDGER:
76 return "get_ledger";
77 case protocol::mtLEDGER_DATA:
78 return "ledger_data";
79 case protocol::mtPROPOSE_LEDGER:
80 return "propose";
81 case protocol::mtSTATUS_CHANGE:
82 return "status";
83 case protocol::mtHAVE_SET:
84 return "have_set";
85 case protocol::mtVALIDATORLIST:
86 return "validator_list";
87 case protocol::mtVALIDATORLISTCOLLECTION:
88 return "validator_list_collection";
89 case protocol::mtVALIDATION:
90 return "validation";
91 case protocol::mtGET_OBJECTS:
92 return "get_objects";
93 case protocol::mtHAVE_TRANSACTIONS:
94 return "have_transactions";
95 case protocol::mtTRANSACTIONS:
96 return "transactions";
97 case protocol::mtSQUELCH:
98 return "squelch";
99 case protocol::mtPROOF_PATH_REQ:
100 return "proof_path_request";
101 case protocol::mtPROOF_PATH_RESPONSE:
102 return "proof_path_response";
103 case protocol::mtREPLAY_DELTA_REQ:
104 return "replay_delta_request";
105 case protocol::mtREPLAY_DELTA_RESPONSE:
106 return "replay_delta_response";
107 default:
108 break;
109 }
110 return "unknown";
111}
112
113namespace detail {
114
141
142template <typename BufferSequence>
143auto
144buffersBegin(BufferSequence const& bufs)
145{
146 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(
147 bufs);
148}
149
150template <typename BufferSequence>
151auto
152buffersEnd(BufferSequence const& bufs)
153{
154 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(
155 bufs);
156}
157
167template <class BufferSequence>
170 boost::system::error_code& ec,
171 BufferSequence const& bufs,
172 std::size_t size)
173{
174 using namespace ripple::compression;
175
176 MessageHeader hdr;
177 auto iter = buffersBegin(bufs);
178 XRPL_ASSERT(
179 iter != buffersEnd(bufs),
180 "ripple::detail::parseMessageHeader : non-empty buffer");
181
182 // Check valid header compressed message:
183 // - 4 bits are the compression algorithm, 1st bit is always set to 1
184 // - 2 bits are always set to 0
185 // - 26 bits are the payload size
186 // - 32 bits are the uncompressed data size
187 if (*iter & 0x80)
188 {
189 hdr.header_size = headerBytesCompressed;
190
191 // not enough bytes to parse the header
192 if (size < hdr.header_size)
193 {
194 ec = make_error_code(boost::system::errc::success);
195 return std::nullopt;
196 }
197
198 if (*iter & 0x0C)
199 {
200 ec = make_error_code(boost::system::errc::protocol_error);
201 return std::nullopt;
202 }
203
204 hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
205
207 {
208 ec = make_error_code(boost::system::errc::protocol_error);
209 return std::nullopt;
210 }
211
212 for (int i = 0; i != 4; ++i)
213 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
214
215 // clear the top four bits (the compression bits).
216 hdr.payload_wire_size &= 0x0FFFFFFF;
217
219
220 for (int i = 0; i != 2; ++i)
221 hdr.message_type = (hdr.message_type << 8) + *iter++;
222
223 for (int i = 0; i != 4; ++i)
224 hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
225
226 return hdr;
227 }
228
229 // Check valid header uncompressed message:
230 // - 6 bits are set to 0
231 // - 26 bits are the payload size
232 if ((*iter & 0xFC) == 0)
233 {
234 hdr.header_size = headerBytes;
235
236 if (size < hdr.header_size)
237 {
238 ec = make_error_code(boost::system::errc::success);
239 return std::nullopt;
240 }
241
242 hdr.algorithm = Algorithm::None;
243
244 for (int i = 0; i != 4; ++i)
245 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
246
249
250 for (int i = 0; i != 2; ++i)
251 hdr.message_type = (hdr.message_type << 8) + *iter++;
252
253 return hdr;
254 }
255
256 ec = make_error_code(boost::system::errc::no_message);
257 return std::nullopt;
258}
259
260template <
261 class T,
262 class Buffers,
263 class = std::enable_if_t<
266parseMessageContent(MessageHeader const& header, Buffers const& buffers)
267{
268 auto const m = std::make_shared<T>();
269
270 ZeroCopyInputStream<Buffers> stream(buffers);
271 stream.Skip(header.header_size);
272
274 {
276 payload.resize(header.uncompressed_size);
277
278 auto const payloadSize = ripple::compression::decompress(
279 stream,
280 header.payload_wire_size,
281 payload.data(),
282 header.uncompressed_size,
283 header.algorithm);
284
285 if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
286 return {};
287 }
288 else if (!m->ParseFromZeroCopyStream(&stream))
289 return {};
290
291 return m;
292}
293
294template <
295 class T,
296 class Buffers,
297 class Handler,
298 class = std::enable_if_t<
300bool
301invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
302{
303 auto const m = parseMessageContent<T>(header, buffers);
304 if (!m)
305 return false;
306
307 using namespace ripple::compression;
308 handler.onMessageBegin(
309 header.message_type,
310 m,
311 header.payload_wire_size,
312 header.uncompressed_size,
313 header.algorithm != Algorithm::None);
314 handler.onMessage(m);
315 handler.onMessageEnd(header.message_type, m);
316
317 return true;
318}
319
320} // namespace detail
321
334template <class Buffers, class Handler>
337 Buffers const& buffers,
338 Handler& handler,
339 std::size_t& hint)
340{
342
343 auto const size = boost::asio::buffer_size(buffers);
344
345 if (size == 0)
346 return result;
347
348 auto header = detail::parseMessageHeader(result.second, buffers, size);
349
350 // If we can't parse the header then it may be that we don't have enough
351 // bytes yet, or because the message was cut off (if error_code is success).
352 // Otherwise we failed to match the header's marker (error_code is set to
353 // no_message) or the compression algorithm is invalid (error_code is
354 // protocol_error) and signal an error.
355 if (!header)
356 return result;
357
358 // We implement a maximum size for protocol messages. Sending a message
359 // whose size exceeds this may result in the connection being dropped. A
360 // larger message size may be supported in the future or negotiated as
361 // part of a protocol upgrade.
362 if (header->payload_wire_size > maximiumMessageSize ||
363 header->uncompressed_size > maximiumMessageSize)
364 {
365 result.second = make_error_code(boost::system::errc::message_size);
366 return result;
367 }
368
369 // We requested uncompressed messages from the peer but received compressed.
370 if (!handler.compressionEnabled() &&
371 header->algorithm != compression::Algorithm::None)
372 {
373 result.second = make_error_code(boost::system::errc::protocol_error);
374 return result;
375 }
376
377 // We don't have the whole message yet. This isn't an error but we have
378 // nothing to do.
379 if (header->total_wire_size > size)
380 {
381 hint = header->total_wire_size - size;
382 return result;
383 }
384
385 bool success;
386
387 switch (header->message_type)
388 {
389 case protocol::mtMANIFESTS:
390 success = detail::invoke<protocol::TMManifests>(
391 *header, buffers, handler);
392 break;
393 case protocol::mtPING:
394 success =
395 detail::invoke<protocol::TMPing>(*header, buffers, handler);
396 break;
397 case protocol::mtCLUSTER:
398 success =
399 detail::invoke<protocol::TMCluster>(*header, buffers, handler);
400 break;
401 case protocol::mtENDPOINTS:
402 success = detail::invoke<protocol::TMEndpoints>(
403 *header, buffers, handler);
404 break;
405 case protocol::mtTRANSACTION:
406 success = detail::invoke<protocol::TMTransaction>(
407 *header, buffers, handler);
408 break;
409 case protocol::mtGET_LEDGER:
410 success = detail::invoke<protocol::TMGetLedger>(
411 *header, buffers, handler);
412 break;
413 case protocol::mtLEDGER_DATA:
414 success = detail::invoke<protocol::TMLedgerData>(
415 *header, buffers, handler);
416 break;
417 case protocol::mtPROPOSE_LEDGER:
418 success = detail::invoke<protocol::TMProposeSet>(
419 *header, buffers, handler);
420 break;
421 case protocol::mtSTATUS_CHANGE:
422 success = detail::invoke<protocol::TMStatusChange>(
423 *header, buffers, handler);
424 break;
425 case protocol::mtHAVE_SET:
426 success = detail::invoke<protocol::TMHaveTransactionSet>(
427 *header, buffers, handler);
428 break;
429 case protocol::mtVALIDATION:
430 success = detail::invoke<protocol::TMValidation>(
431 *header, buffers, handler);
432 break;
433 case protocol::mtVALIDATORLIST:
434 success = detail::invoke<protocol::TMValidatorList>(
435 *header, buffers, handler);
436 break;
437 case protocol::mtVALIDATORLISTCOLLECTION:
438 success = detail::invoke<protocol::TMValidatorListCollection>(
439 *header, buffers, handler);
440 break;
441 case protocol::mtGET_OBJECTS:
442 success = detail::invoke<protocol::TMGetObjectByHash>(
443 *header, buffers, handler);
444 break;
445 case protocol::mtHAVE_TRANSACTIONS:
446 success = detail::invoke<protocol::TMHaveTransactions>(
447 *header, buffers, handler);
448 break;
449 case protocol::mtTRANSACTIONS:
450 success = detail::invoke<protocol::TMTransactions>(
451 *header, buffers, handler);
452 break;
453 case protocol::mtSQUELCH:
454 success =
455 detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
456 break;
457 case protocol::mtPROOF_PATH_REQ:
458 success = detail::invoke<protocol::TMProofPathRequest>(
459 *header, buffers, handler);
460 break;
461 case protocol::mtPROOF_PATH_RESPONSE:
462 success = detail::invoke<protocol::TMProofPathResponse>(
463 *header, buffers, handler);
464 break;
465 case protocol::mtREPLAY_DELTA_REQ:
466 success = detail::invoke<protocol::TMReplayDeltaRequest>(
467 *header, buffers, handler);
468 break;
469 case protocol::mtREPLAY_DELTA_RESPONSE:
470 success = detail::invoke<protocol::TMReplayDeltaResponse>(
471 *header, buffers, handler);
472 break;
473 default:
474 handler.onMessageUnknown(header->message_type);
475 success = true;
476 break;
477 }
478
479 result.first = header->total_wire_size;
480
481 if (!success)
482 result.second = make_error_code(boost::system::errc::bad_message);
483
484 return result;
485}
486
487} // namespace ripple
488
489#endif
Implements ZeroCopyInputStream around a buffer sequence.
T data(T... args)
T is_same_v
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition Compression.h:49
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
auto buffersBegin(BufferSequence const &bufs)
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
auto buffersEnd(BufferSequence const &bufs)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::error_code make_error_code(ripple::TokenCodecErrc e)
constexpr std::size_t maximiumMessageSize
Definition Message.h:34
T resize(T... args)
std::uint32_t header_size
The size of the header associated with this message.
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
std::uint16_t message_type
The type of the message.
std::uint32_t payload_wire_size
The size of the payload on the wire.
std::uint32_t total_wire_size
The size of the message on the wire.
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.