rippled
Loading...
Searching...
No Matches
ProtocolMessage.h
1#pragma once
2
3#include <xrpld/overlay/Compression.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/detail/ZeroCopyStream.h>
6
7#include <xrpl/beast/utility/instrumentation.h>
8#include <xrpl/protocol/messages.h>
9
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/buffers_iterator.hpp>
12
13#include <cstdint>
14#include <optional>
15#include <type_traits>
16#include <vector>
17
18namespace xrpl {
19
20inline protocol::MessageType
21protocolMessageType(protocol::TMGetLedger const&)
22{
23 return protocol::mtGET_LEDGER;
24}
25
26inline protocol::MessageType
27protocolMessageType(protocol::TMReplayDeltaRequest const&)
28{
29 return protocol::mtREPLAY_DELTA_REQ;
30}
31
32inline protocol::MessageType
33protocolMessageType(protocol::TMProofPathRequest const&)
34{
35 return protocol::mtPROOF_PATH_REQ;
36}
37
39template <class = void>
42{
43 switch (type)
44 {
45 case protocol::mtMANIFESTS:
46 return "manifests";
47 case protocol::mtPING:
48 return "ping";
49 case protocol::mtCLUSTER:
50 return "cluster";
51 case protocol::mtENDPOINTS:
52 return "endpoints";
53 case protocol::mtTRANSACTION:
54 return "tx";
55 case protocol::mtGET_LEDGER:
56 return "get_ledger";
57 case protocol::mtLEDGER_DATA:
58 return "ledger_data";
59 case protocol::mtPROPOSE_LEDGER:
60 return "propose";
61 case protocol::mtSTATUS_CHANGE:
62 return "status";
63 case protocol::mtHAVE_SET:
64 return "have_set";
65 case protocol::mtVALIDATOR_LIST:
66 return "validator_list";
67 case protocol::mtVALIDATOR_LIST_COLLECTION:
68 return "validator_list_collection";
69 case protocol::mtVALIDATION:
70 return "validation";
71 case protocol::mtGET_OBJECTS:
72 return "get_objects";
73 case protocol::mtHAVE_TRANSACTIONS:
74 return "have_transactions";
75 case protocol::mtTRANSACTIONS:
76 return "transactions";
77 case protocol::mtSQUELCH:
78 return "squelch";
79 case protocol::mtPROOF_PATH_REQ:
80 return "proof_path_request";
81 case protocol::mtPROOF_PATH_RESPONSE:
82 return "proof_path_response";
83 case protocol::mtREPLAY_DELTA_REQ:
84 return "replay_delta_request";
85 case protocol::mtREPLAY_DELTA_RESPONSE:
86 return "replay_delta_response";
87 default:
88 break;
89 }
90 return "unknown";
91}
92
93namespace detail {
94
121
122template <typename BufferSequence>
123auto
124buffersBegin(BufferSequence const& bufs)
125{
126 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs);
127}
128
129template <typename BufferSequence>
130auto
131buffersEnd(BufferSequence const& bufs)
132{
133 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(bufs);
134}
135
145template <class BufferSequence>
147parseMessageHeader(boost::system::error_code& ec, BufferSequence const& bufs, std::size_t size)
148{
149 using namespace xrpl::compression;
150
151 MessageHeader hdr;
152 auto iter = buffersBegin(bufs);
153 XRPL_ASSERT(iter != buffersEnd(bufs), "xrpl::detail::parseMessageHeader : non-empty buffer");
154
155 // Check valid header compressed message:
156 // - 4 bits are the compression algorithm, 1st bit is always set to 1
157 // - 2 bits are always set to 0
158 // - 26 bits are the payload size
159 // - 32 bits are the uncompressed data size
160 if (*iter & 0x80)
161 {
162 hdr.header_size = headerBytesCompressed;
163
164 // not enough bytes to parse the header
165 if (size < hdr.header_size)
166 {
167 ec = make_error_code(boost::system::errc::success);
168 return std::nullopt;
169 }
170
171 if (*iter & 0x0C)
172 {
173 ec = make_error_code(boost::system::errc::protocol_error);
174 return std::nullopt;
175 }
176
177 hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
178
180 {
181 ec = make_error_code(boost::system::errc::protocol_error);
182 return std::nullopt;
183 }
184
185 for (int i = 0; i != 4; ++i)
186 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
187
188 // clear the top four bits (the compression bits).
189 hdr.payload_wire_size &= 0x0FFFFFFF;
190
192
193 for (int i = 0; i != 2; ++i)
194 hdr.message_type = (hdr.message_type << 8) + *iter++;
195
196 for (int i = 0; i != 4; ++i)
197 hdr.uncompressed_size = (hdr.uncompressed_size << 8) + *iter++;
198
199 return hdr;
200 }
201
202 // Check valid header uncompressed message:
203 // - 6 bits are set to 0
204 // - 26 bits are the payload size
205 if ((*iter & 0xFC) == 0)
206 {
207 hdr.header_size = headerBytes;
208
209 if (size < hdr.header_size)
210 {
211 ec = make_error_code(boost::system::errc::success);
212 return std::nullopt;
213 }
214
215 hdr.algorithm = Algorithm::None;
216
217 for (int i = 0; i != 4; ++i)
218 hdr.payload_wire_size = (hdr.payload_wire_size << 8) + *iter++;
219
222
223 for (int i = 0; i != 2; ++i)
224 hdr.message_type = (hdr.message_type << 8) + *iter++;
225
226 return hdr;
227 }
228
229 ec = make_error_code(boost::system::errc::no_message);
230 return std::nullopt;
231}
232
233template <
234 class T,
235 class Buffers,
238parseMessageContent(MessageHeader const& header, Buffers const& buffers)
239{
240 auto m = std::make_shared<T>();
241
242 ZeroCopyInputStream<Buffers> stream(buffers);
243 stream.Skip(header.header_size);
244
246 {
248 payload.resize(header.uncompressed_size);
249
250 auto const payloadSize = xrpl::compression::decompress(
251 stream,
252 header.payload_wire_size,
253 payload.data(),
254 header.uncompressed_size,
255 header.algorithm);
256
257 if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
258 return {};
259 }
260 else if (!m->ParseFromZeroCopyStream(&stream))
261 return {};
262
263 return m;
264}
265
266template <
267 class T,
268 class Buffers,
269 class Handler,
271bool
272invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
273{
274 auto const m = parseMessageContent<T>(header, buffers);
275 if (!m)
276 return false;
277
278 using namespace xrpl::compression;
279 handler.onMessageBegin(
280 header.message_type,
281 m,
282 header.payload_wire_size,
283 header.uncompressed_size,
284 header.algorithm != Algorithm::None);
285 handler.onMessage(m);
286 handler.onMessageEnd(header.message_type, m);
287
288 return true;
289}
290
291} // namespace detail
292
305template <class Buffers, class Handler>
307invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hint)
308{
310
311 auto const size = boost::asio::buffer_size(buffers);
312
313 if (size == 0)
314 return result;
315
316 auto header = detail::parseMessageHeader(result.second, buffers, size);
317
318 // If we can't parse the header then it may be that we don't have enough
319 // bytes yet, or because the message was cut off (if error_code is success).
320 // Otherwise we failed to match the header's marker (error_code is set to
321 // no_message) or the compression algorithm is invalid (error_code is
322 // protocol_error) and signal an error.
323 if (!header)
324 return result;
325
326 // We implement a maximum size for protocol messages. Sending a message
327 // whose size exceeds this may result in the connection being dropped. A
328 // larger message size may be supported in the future or negotiated as
329 // part of a protocol upgrade.
330 if (header->payload_wire_size > maximumMessageSize ||
331 header->uncompressed_size > maximumMessageSize)
332 {
333 result.second = make_error_code(boost::system::errc::message_size);
334 return result;
335 }
336
337 // We requested uncompressed messages from the peer but received compressed.
338 if (!handler.compressionEnabled() && header->algorithm != compression::Algorithm::None)
339 {
340 result.second = make_error_code(boost::system::errc::protocol_error);
341 return result;
342 }
343
344 // We don't have the whole message yet. This isn't an error but we have
345 // nothing to do.
346 if (header->total_wire_size > size)
347 {
348 hint = header->total_wire_size - size;
349 return result;
350 }
351
352 bool success = false;
353
354 switch (header->message_type)
355 {
356 case protocol::mtMANIFESTS:
357 success = detail::invoke<protocol::TMManifests>(*header, buffers, handler);
358 break;
359 case protocol::mtPING:
360 success = detail::invoke<protocol::TMPing>(*header, buffers, handler);
361 break;
362 case protocol::mtCLUSTER:
363 success = detail::invoke<protocol::TMCluster>(*header, buffers, handler);
364 break;
365 case protocol::mtENDPOINTS:
366 success = detail::invoke<protocol::TMEndpoints>(*header, buffers, handler);
367 break;
368 case protocol::mtTRANSACTION:
369 success = detail::invoke<protocol::TMTransaction>(*header, buffers, handler);
370 break;
371 case protocol::mtGET_LEDGER:
372 success = detail::invoke<protocol::TMGetLedger>(*header, buffers, handler);
373 break;
374 case protocol::mtLEDGER_DATA:
375 success = detail::invoke<protocol::TMLedgerData>(*header, buffers, handler);
376 break;
377 case protocol::mtPROPOSE_LEDGER:
378 success = detail::invoke<protocol::TMProposeSet>(*header, buffers, handler);
379 break;
380 case protocol::mtSTATUS_CHANGE:
381 success = detail::invoke<protocol::TMStatusChange>(*header, buffers, handler);
382 break;
383 case protocol::mtHAVE_SET:
384 success = detail::invoke<protocol::TMHaveTransactionSet>(*header, buffers, handler);
385 break;
386 case protocol::mtVALIDATION:
387 success = detail::invoke<protocol::TMValidation>(*header, buffers, handler);
388 break;
389 case protocol::mtVALIDATOR_LIST:
390 success = detail::invoke<protocol::TMValidatorList>(*header, buffers, handler);
391 break;
392 case protocol::mtVALIDATOR_LIST_COLLECTION:
393 success =
394 detail::invoke<protocol::TMValidatorListCollection>(*header, buffers, handler);
395 break;
396 case protocol::mtGET_OBJECTS:
397 success = detail::invoke<protocol::TMGetObjectByHash>(*header, buffers, handler);
398 break;
399 case protocol::mtHAVE_TRANSACTIONS:
400 success = detail::invoke<protocol::TMHaveTransactions>(*header, buffers, handler);
401 break;
402 case protocol::mtTRANSACTIONS:
403 success = detail::invoke<protocol::TMTransactions>(*header, buffers, handler);
404 break;
405 case protocol::mtSQUELCH:
406 success = detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
407 break;
408 case protocol::mtPROOF_PATH_REQ:
409 success = detail::invoke<protocol::TMProofPathRequest>(*header, buffers, handler);
410 break;
411 case protocol::mtPROOF_PATH_RESPONSE:
412 success = detail::invoke<protocol::TMProofPathResponse>(*header, buffers, handler);
413 break;
414 case protocol::mtREPLAY_DELTA_REQ:
415 success = detail::invoke<protocol::TMReplayDeltaRequest>(*header, buffers, handler);
416 break;
417 case protocol::mtREPLAY_DELTA_RESPONSE:
418 success = detail::invoke<protocol::TMReplayDeltaResponse>(*header, buffers, handler);
419 break;
420 default:
421 handler.onMessageUnknown(header->message_type);
422 success = true;
423 break;
424 }
425
426 result.first = header->total_wire_size;
427
428 if (!success)
429 result.second = make_error_code(boost::system::errc::bad_message);
430
431 return result;
432}
433
434} // namespace xrpl
Implements ZeroCopyInputStream around a buffer sequence.
T data(T... args)
T is_same_v
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition Compression.h:29
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
auto buffersBegin(BufferSequence const &bufs)
auto buffersEnd(BufferSequence const &bufs)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::error_code make_error_code(xrpl::TokenCodecErrc e)
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
constexpr std::size_t maximumMessageSize
Definition Message.h:14
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
T resize(T... args)
std::uint32_t header_size
The size of the header associated with this message.
std::uint32_t uncompressed_size
Uncompressed message size if the message is compressed.
std::uint32_t total_wire_size
The size of the message on the wire.
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
std::uint32_t payload_wire_size
The size of the payload on the wire.
std::uint16_t message_type
The type of the message.