xrpld
Loading...
Searching...
No Matches
ProtocolMessage.h
1#pragma once
2
3#include <xrpld/overlay/Compression.h>
4#include <xrpld/overlay/Message.h>
5#include <xrpld/overlay/detail/ZeroCopyStream.h>
6
7#include <xrpl/beast/utility/instrumentation.h>
8#include <xrpl/protocol/messages.h>
9
10#include <boost/asio/buffer.hpp>
11#include <boost/asio/buffers_iterator.hpp>
12
13#include <cstdint>
14#include <optional>
15#include <type_traits>
16#include <vector>
17
18namespace xrpl {
19
20inline protocol::MessageType
21protocolMessageType(protocol::TMGetLedger const&)
22{
23 return protocol::mtGET_LEDGER;
24}
25
26inline protocol::MessageType
27protocolMessageType(protocol::TMReplayDeltaRequest const&)
28{
29 return protocol::mtREPLAY_DELTA_REQ;
30}
31
32inline protocol::MessageType
33protocolMessageType(protocol::TMProofPathRequest const&)
34{
35 return protocol::mtPROOF_PATH_REQ;
36}
37
39template <class = void>
42{
43 switch (type)
44 {
45 case protocol::mtMANIFESTS:
46 return "manifests";
47 case protocol::mtPING:
48 return "ping";
49 case protocol::mtCLUSTER:
50 return "cluster";
51 case protocol::mtENDPOINTS:
52 return "endpoints";
53 case protocol::mtTRANSACTION:
54 return "tx";
55 case protocol::mtGET_LEDGER:
56 return "get_ledger";
57 case protocol::mtLEDGER_DATA:
58 return "ledger_data";
59 case protocol::mtPROPOSE_LEDGER:
60 return "propose";
61 case protocol::mtSTATUS_CHANGE:
62 return "status";
63 case protocol::mtHAVE_SET:
64 return "have_set";
65 case protocol::mtVALIDATOR_LIST:
66 return "validator_list";
67 case protocol::mtVALIDATOR_LIST_COLLECTION:
68 return "validator_list_collection";
69 case protocol::mtVALIDATION:
70 return "validation";
71 case protocol::mtGET_OBJECTS:
72 return "get_objects";
73 case protocol::mtHAVE_TRANSACTIONS:
74 return "have_transactions";
75 case protocol::mtTRANSACTIONS:
76 return "transactions";
77 case protocol::mtSQUELCH:
78 return "squelch";
79 case protocol::mtPROOF_PATH_REQ:
80 return "proof_path_request";
81 case protocol::mtPROOF_PATH_RESPONSE:
82 return "proof_path_response";
83 case protocol::mtREPLAY_DELTA_REQ:
84 return "replay_delta_request";
85 case protocol::mtREPLAY_DELTA_RESPONSE:
86 return "replay_delta_response";
87 default:
88 break;
89 }
90 return "unknown";
91}
92
93namespace detail {
94
121
122template <typename BufferSequence>
123auto
124buffersBegin(BufferSequence const& bufs)
125{
126 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::begin(bufs);
127}
128
129template <typename BufferSequence>
130auto
131buffersEnd(BufferSequence const& bufs)
132{
133 return boost::asio::buffers_iterator<BufferSequence, std::uint8_t>::end(bufs);
134}
135
145template <class BufferSequence>
147parseMessageHeader(boost::system::error_code& ec, BufferSequence const& bufs, std::size_t size)
148{
149 using namespace xrpl::compression;
150
151 MessageHeader hdr;
152 auto iter = buffersBegin(bufs);
153 XRPL_ASSERT(iter != buffersEnd(bufs), "xrpl::detail::parseMessageHeader : non-empty buffer");
154
155 // Check valid header compressed message:
156 // - 4 bits are the compression algorithm, 1st bit is always set to 1
157 // - 2 bits are always set to 0
158 // - 26 bits are the payload size
159 // - 32 bits are the uncompressed data size
160 if (*iter & 0x80)
161 {
163
164 // not enough bytes to parse the header
165 if (size < hdr.headerSize)
166 {
167 ec = make_error_code(boost::system::errc::success);
168 return std::nullopt;
169 }
170
171 if (*iter & 0x0C)
172 {
173 ec = make_error_code(boost::system::errc::protocol_error);
174 return std::nullopt;
175 }
176
177 hdr.algorithm = static_cast<compression::Algorithm>(*iter & 0xF0);
178
180 {
181 ec = make_error_code(boost::system::errc::protocol_error);
182 return std::nullopt;
183 }
184
185 for (int i = 0; i != 4; ++i)
186 hdr.payloadWireSize = (hdr.payloadWireSize << 8) + *iter++;
187
188 // clear the top four bits (the compression bits).
189 hdr.payloadWireSize &= 0x0FFFFFFF;
190
192
193 for (int i = 0; i != 2; ++i)
194 hdr.messageType = (hdr.messageType << 8) + *iter++;
195
196 for (int i = 0; i != 4; ++i)
197 hdr.uncompressedSize = (hdr.uncompressedSize << 8) + *iter++;
198
199 return hdr;
200 }
201
202 // Check valid header uncompressed message:
203 // - 6 bits are set to 0
204 // - 26 bits are the payload size
205 if ((*iter & 0xFC) == 0)
206 {
208
209 if (size < hdr.headerSize)
210 {
211 ec = make_error_code(boost::system::errc::success);
212 return std::nullopt;
213 }
214
216
217 for (int i = 0; i != 4; ++i)
218 hdr.payloadWireSize = (hdr.payloadWireSize << 8) + *iter++;
219
222
223 for (int i = 0; i != 2; ++i)
224 hdr.messageType = (hdr.messageType << 8) + *iter++;
225
226 return hdr;
227 }
228
229 ec = make_error_code(boost::system::errc::no_message);
230 return std::nullopt;
231}
232
233template <
234 class T,
235 class Buffers,
238parseMessageContent(MessageHeader const& header, Buffers const& buffers)
239{
240 auto m = std::make_shared<T>();
241
242 ZeroCopyInputStream<Buffers> stream(buffers);
243 stream.Skip(header.headerSize);
244
246 {
248 payload.resize(header.uncompressedSize);
249
250 auto const payloadSize = xrpl::compression::decompress(
251 stream,
252 header.payloadWireSize,
253 payload.data(),
254 header.uncompressedSize,
255 header.algorithm);
256
257 if (payloadSize == 0 || !m->ParseFromArray(payload.data(), payloadSize))
258 return {};
259 }
260 else if (!m->ParseFromZeroCopyStream(&stream))
261 {
262 return {};
263 }
264
265 return m;
266}
267
268template <
269 class T,
270 class Buffers,
271 class Handler,
273bool
274invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
275{
276 auto const m = parseMessageContent<T>(header, buffers);
277 if (!m)
278 return false;
279
280 using namespace xrpl::compression;
281 handler.onMessageBegin(
282 header.messageType,
283 m,
284 header.payloadWireSize,
285 header.uncompressedSize,
286 header.algorithm != Algorithm::None);
287 handler.onMessage(m);
288 handler.onMessageEnd(header.messageType, m);
289
290 return true;
291}
292
293} // namespace detail
294
307template <class Buffers, class Handler>
309invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hint)
310{
312
313 auto const size = boost::asio::buffer_size(buffers);
314
315 if (size == 0)
316 return result;
317
318 auto header = detail::parseMessageHeader(result.second, buffers, size);
319
320 // If we can't parse the header then it may be that we don't have enough
321 // bytes yet, or because the message was cut off (if error_code is success).
322 // Otherwise we failed to match the header's marker (error_code is set to
323 // no_message) or the compression algorithm is invalid (error_code is
324 // protocol_error) and signal an error.
325 if (!header)
326 return result;
327
328 // We implement a maximum size for protocol messages. Sending a message
329 // whose size exceeds this may result in the connection being dropped. A
330 // larger message size may be supported in the future or negotiated as
331 // part of a protocol upgrade.
332 if (header->payloadWireSize > kMaximumMessageSize ||
333 header->uncompressedSize > kMaximumMessageSize)
334 {
335 result.second = make_error_code(boost::system::errc::message_size);
336 return result;
337 }
338
339 // We requested uncompressed messages from the peer but received compressed.
340 if (!handler.compressionEnabled() && header->algorithm != compression::Algorithm::None)
341 {
342 result.second = make_error_code(boost::system::errc::protocol_error);
343 return result;
344 }
345
346 // We don't have the whole message yet. This isn't an error but we have
347 // nothing to do.
348 if (header->totalWireSize > size)
349 {
350 hint = header->totalWireSize - size;
351 return result;
352 }
353
354 bool success = false;
355
356 switch (header->messageType)
357 {
358 case protocol::mtMANIFESTS:
359 success = detail::invoke<protocol::TMManifests>(*header, buffers, handler);
360 break;
361 case protocol::mtPING:
362 success = detail::invoke<protocol::TMPing>(*header, buffers, handler);
363 break;
364 case protocol::mtCLUSTER:
365 success = detail::invoke<protocol::TMCluster>(*header, buffers, handler);
366 break;
367 case protocol::mtENDPOINTS:
368 success = detail::invoke<protocol::TMEndpoints>(*header, buffers, handler);
369 break;
370 case protocol::mtTRANSACTION:
371 success = detail::invoke<protocol::TMTransaction>(*header, buffers, handler);
372 break;
373 case protocol::mtGET_LEDGER:
374 success = detail::invoke<protocol::TMGetLedger>(*header, buffers, handler);
375 break;
376 case protocol::mtLEDGER_DATA:
377 success = detail::invoke<protocol::TMLedgerData>(*header, buffers, handler);
378 break;
379 case protocol::mtPROPOSE_LEDGER:
380 success = detail::invoke<protocol::TMProposeSet>(*header, buffers, handler);
381 break;
382 case protocol::mtSTATUS_CHANGE:
383 success = detail::invoke<protocol::TMStatusChange>(*header, buffers, handler);
384 break;
385 case protocol::mtHAVE_SET:
386 success = detail::invoke<protocol::TMHaveTransactionSet>(*header, buffers, handler);
387 break;
388 case protocol::mtVALIDATION:
389 success = detail::invoke<protocol::TMValidation>(*header, buffers, handler);
390 break;
391 case protocol::mtVALIDATOR_LIST:
392 success = detail::invoke<protocol::TMValidatorList>(*header, buffers, handler);
393 break;
394 case protocol::mtVALIDATOR_LIST_COLLECTION:
395 success =
397 break;
398 case protocol::mtGET_OBJECTS:
399 success = detail::invoke<protocol::TMGetObjectByHash>(*header, buffers, handler);
400 break;
401 case protocol::mtHAVE_TRANSACTIONS:
402 success = detail::invoke<protocol::TMHaveTransactions>(*header, buffers, handler);
403 break;
404 case protocol::mtTRANSACTIONS:
405 success = detail::invoke<protocol::TMTransactions>(*header, buffers, handler);
406 break;
407 case protocol::mtSQUELCH:
408 success = detail::invoke<protocol::TMSquelch>(*header, buffers, handler);
409 break;
410 case protocol::mtPROOF_PATH_REQ:
411 success = detail::invoke<protocol::TMProofPathRequest>(*header, buffers, handler);
412 break;
413 case protocol::mtPROOF_PATH_RESPONSE:
414 success = detail::invoke<protocol::TMProofPathResponse>(*header, buffers, handler);
415 break;
416 case protocol::mtREPLAY_DELTA_REQ:
417 success = detail::invoke<protocol::TMReplayDeltaRequest>(*header, buffers, handler);
418 break;
419 case protocol::mtREPLAY_DELTA_RESPONSE:
420 success = detail::invoke<protocol::TMReplayDeltaResponse>(*header, buffers, handler);
421 break;
422 default:
423 handler.onMessageUnknown(header->messageType);
424 success = true;
425 break;
426 }
427
428 result.first = header->totalWireSize;
429
430 if (!success)
431 result.second = make_error_code(boost::system::errc::bad_message);
432
433 return result;
434}
435
436} // namespace xrpl
Implements ZeroCopyInputStream around a buffer sequence.
T data(T... args)
T make_shared(T... args)
constexpr std::size_t kHeaderBytesCompressed
Definition Compression.h:9
constexpr std::size_t kHeaderBytes
Definition Compression.h:8
std::size_t decompress(InputStream &in, std::size_t inSize, std::uint8_t *decompressed, std::size_t decompressedSize, Algorithm algorithm=Algorithm::LZ4)
Decompress input stream.
Definition Compression.h:27
bool invoke(MessageHeader const &header, Buffers const &buffers, Handler &handler)
std::optional< MessageHeader > parseMessageHeader(boost::system::error_code &ec, BufferSequence const &bufs, std::size_t size)
Parse a message header.
std::shared_ptr< T > parseMessageContent(MessageHeader const &header, Buffers const &buffers)
auto buffersBegin(BufferSequence const &bufs)
auto buffersEnd(BufferSequence const &bufs)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
constexpr std::size_t kMaximumMessageSize
Definition Message.h:14
std::error_code make_error_code(xrpl::TokenCodecErrc e)
protocol::MessageType protocolMessageType(protocol::TMGetLedger const &)
std::pair< std::size_t, boost::system::error_code > invokeProtocolMessage(Buffers const &buffers, Handler &handler, std::size_t &hint)
Calls the handler for up to one protocol message in the passed buffers.
std::string protocolMessageName(int type)
Returns the name of a protocol message given its type.
T resize(T... args)
std::uint32_t uncompressedSize
Uncompressed message size if the message is compressed.
std::uint16_t messageType
The type of the message.
std::uint32_t headerSize
The size of the header associated with this message.
compression::Algorithm algorithm
Indicates which compression algorithm the payload is compressed with.
std::uint32_t totalWireSize
The size of the message on the wire.
std::uint32_t payloadWireSize
The size of the payload on the wire.