fusion_engine_framer.cc
Go to the documentation of this file.
1 /**************************************************************************/ /**
2  * @brief FusionEngine message framer.
3  * @file
4  ******************************************************************************/
5 
6 #define P1_VMODULE_NAME fusion_engine_framer
7 
9 
10 #include <cstring> // For memmove()
11 #if P1_HAVE_STD_OSTREAM
12 # include <iomanip>
13 # include <ostream>
14 # include <type_traits>
15 #endif
16 
19 
21 using namespace point_one::fusion_engine::parsers;
22 
23 /******************************************************************************/
24 template <typename T>
26  public:
27  HexPrintableIntegerInst(T value) : value_(value) {}
28 
29  template <typename U> // all instantiations of this template are my friends
31 
32  private:
33  const T value_;
34 };
35 
36 /******************************************************************************/
37 template <typename T>
39  const HexPrintableIntegerInst<T>& obj) {
40 #if P1_HAVE_STD_OSTREAM
41  static_assert(std::is_integral<T>::value, "Integer required.");
42 
43  stream << "0x" << std::hex << std::setfill('0') << std::setw(sizeof(obj) * 2);
44 
45  if (sizeof(T) == 1) {
46  stream << (((unsigned)obj.value_) & 0xFF);
47  } else {
48  stream << obj.value_;
49  }
50 
51  stream << std::dec;
52 
53  if (sizeof(obj) == 1) {
54  if (obj.value_ >= 0x20 && obj.value_ <= 0x7E) {
55  stream << " ('" << (char)obj.value_ << "')";
56  } else {
57  stream << " (---)";
58  }
59  }
60 #endif
61  return stream;
62 }
63 
64 /**
65  * @brief Wrap an integer so it will be output to a stream as its hex
66  * representation.
67  *
68  * For example:
69  *
70  * ```cpp
71  * std::cout << HexPrintableValue((int16_t)-255) << std::endl;
72  * std::cout << HexPrintableValue((uint32_t)255) << std::endl;
73  * std::cout << HexPrintableValue((uint8_t)48) << std::endl;
74  * ```
75  *
76  * generates the following output:
77  *
78  * ```
79  * 0xff01
80  * 0x000000ff
81  * 0x30 ('0')
82  * ```
83  *
84  * @tparam T The type of the value parameter (inferred implicitly).
85  * @param value The integer value to wrap.
86  *
87  * @return The wrapped integer that can be used in an @ref p1_ostream.
88  */
89 template <typename T>
91  return HexPrintableIntegerInst<T>(value);
92 }
93 
94 /******************************************************************************/
95 FusionEngineFramer::FusionEngineFramer(void* buffer, size_t capacity_bytes) {
96  // Allocate a buffer internally.
97  if (buffer == nullptr) {
98  // We enforce 4B alignment, so we need to allocate 3 extra bytes to
99  // guarantee that the buffer has at least the requested capacity.
100  SetBuffer(nullptr, capacity_bytes + 3);
101  }
102  // Use user-provided storage.
103  else {
104  SetBuffer(buffer, capacity_bytes);
105  }
106 }
107 
108 /******************************************************************************/
109 FusionEngineFramer::~FusionEngineFramer() { ClearManagedBuffer(); }
110 
111 /******************************************************************************/
112 void FusionEngineFramer::SetBuffer(void* buffer, size_t capacity_bytes) {
113  if (capacity_bytes < sizeof(MessageHeader)) {
114  LOG(ERROR) << "FusionEngine framing buffer too small. [capacity="
115  << capacity_bytes << " B, min=" << sizeof(MessageHeader)
116  << " B]";
117  return;
118  }
119  // Restrict the buffer capacity to 2^31 bytes. We don't expect to ever have a
120  // single message anywhere near that large, and don't expect users to ever
121  // pass in a buffer that size. Using uint32_t instead of size_t internally
122  // makes it easier to guarantee behavior between 64b and 32b architectures. We
123  // do 2^31, not 2^32, so we can use int32_t for return values internally.
124  else if (capacity_bytes > 0x7FFFFFFF) {
125  LOG(WARNING) << "Limiting buffer capacity to 2^31 B. [original_capacity="
126  << capacity_bytes << " B]";
127  capacity_bytes = 0x7FFFFFFF;
128  }
129 
130  ClearManagedBuffer();
131  if (buffer == nullptr) {
132  buffer = new uint8_t[capacity_bytes];
133  is_buffer_managed_ = true;
134  }
135 
136  // Enforce 4B alignment at the beginning of the buffer.
137  uint8_t* buffer_unaligned = static_cast<uint8_t*>(buffer);
138  buffer_ = reinterpret_cast<uint8_t*>(
139  (reinterpret_cast<size_t>(buffer_unaligned) + 3) &
140  ~(static_cast<size_t>(3)));
141  capacity_bytes_ =
142  static_cast<uint32_t>(capacity_bytes - (buffer_ - buffer_unaligned));
143 
144  Reset();
145 }
146 
147 /******************************************************************************/
148 void FusionEngineFramer::Reset() {
149  state_ = State::SYNC0;
150  next_byte_index_ = 0;
151  current_message_size_ = 0;
152 }
153 
154 /******************************************************************************/
155 size_t FusionEngineFramer::OnData(const uint8_t* buffer, size_t length_bytes) {
156  // Process each byte. If the user-supplied buffer was too small, we can't
157  // parse messages.
158  if (buffer_ != nullptr) {
159  VLOG(2) << "Received " << length_bytes << " bytes.";
160  size_t total_dispatched_bytes = 0;
161  for (size_t idx = 0; idx < length_bytes; ++idx) {
162  uint8_t byte = buffer[idx];
163  buffer_[next_byte_index_++] = byte;
164  int32_t dispatched_message_size = OnByte(false);
165  if (dispatched_message_size == 0) {
166  // Waiting for more data. Nothing to do.
167  } else if (dispatched_message_size > 0) {
168  // Message framed successfully. Reset for the next one.
169  next_byte_index_ = 0;
170  total_dispatched_bytes += (size_t)dispatched_message_size;
171  } else if (next_byte_index_ > 0) {
172  // If OnByte() indicated an error (size < 0) and there is still data in
173  // the buffer, either the CRC failed or the payload was too big to fit
174  // in the buffer.
175  //
176  // In either case, it is possible we found what looked like the preamble
177  // somewhere within the data stream but it wasn't actually a valid
178  // message. In that case, the data we processed may contain the start of
179  // a valid message, or even one or more complete messages, starting
180  // somewhere after byte 0. Perform a resync operation to find valid
181  // messages.
182  total_dispatched_bytes += Resync();
183  } else {
184  // OnByte() caught an unrecoverable error and reset the buffer. Nothing
185  // to do.
186  }
187  }
188  return total_dispatched_bytes;
189  } else {
190  return 0;
191  }
192 }
193 
194 /******************************************************************************/
195 int32_t FusionEngineFramer::OnByte(bool quiet) {
196  // User-supplied buffer was too small. Can't parse messages.
197  if (buffer_ == nullptr) {
198  return 0;
199  }
200 
201  // If warnings are disabled, run in quiet mode.
202  if (!warn_on_error_) {
203  quiet = true;
204  }
205 
206  // Pull out the byte being processed.
207  if (next_byte_index_ == 0) {
208  LOG(ERROR) << "Byte not found in buffer.";
209  return 0;
210  }
211 
212  uint8_t byte = buffer_[next_byte_index_ - 1];
213 
214  // Look for the first sync byte.
215  //
216  // Note that we always put the first byte at offset 0 in the buffer, and the
217  // buffer is guaranteed to be 4B-aligned by the constructor, so the framed
218  // message will always be 4B aligned.
219  bool crc_check_needed = false;
220  if (state_ == State::SYNC0) {
221  VLOG(4) << "Searching for sync byte 0. [byte=" << HexPrintableInteger(byte)
222  << "]";
223  if (byte == MessageHeader::SYNC0) {
224  VLOG(4) << "Found sync byte 0.";
225  state_ = State::SYNC1;
226  } else {
227  --next_byte_index_;
228  }
229  }
230  // Look for the second sync byte.
231  else if (state_ == State::SYNC1) {
232  VLOG(4) << "Searching for sync byte 1. [byte=" << HexPrintableInteger(byte)
233  << "]";
234  if (byte == MessageHeader::SYNC0) {
235  VLOG(4) << "Found duplicate sync byte 0.";
236  state_ = State::SYNC1;
237  --next_byte_index_;
238  } else if (byte == MessageHeader::SYNC1) {
239  VLOG(3) << "Preamble found. Waiting for header.";
240  state_ = State::HEADER;
241  } else {
242  VLOG(4) << "Did not find sync byte 1. Resetting. [byte="
243  << HexPrintableInteger(byte) << "]";
244  state_ = State::SYNC0;
245  next_byte_index_ = 0;
246  current_message_size_ = 0;
247  }
248  }
249  // Search for a message header.
250  else if (state_ == State::HEADER) {
251  VLOG(4) << "Received " << next_byte_index_ << "/" << sizeof(MessageHeader)
252  << " header bytes. [byte=" << HexPrintableInteger(byte) << "]";
253 
254  // Check if the header is complete.
255  if (next_byte_index_ == sizeof(MessageHeader)) {
256  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
257  current_message_size_ =
258  sizeof(MessageHeader) + header->payload_size_bytes;
259  VLOG(3) << "Header complete. Waiting for payload. [message="
260  << header->message_type << ", seq=" << header->sequence_number
261  << ", payload_size=" << header->payload_size_bytes
262  << " B, message_size=" << current_message_size_ << " B]";
263 
264  // Check for overflow of the messge size uint32_t variable. We don't
265  // currently expect to have _extremely_ large packets, so this should
266  // never happen for a valid message. If it does, assume this is not a
267  // valid message, and instead the sync pattern likely showed up at random
268  // in the data stream.
269  if (current_message_size_ < header->payload_size_bytes) {
270  if (quiet) {
271  VLOG(2) << "Message size overflow. Dropping suspected invalid sync. "
272  "[size="
273  << current_message_size_
274  << " B (payload=" << header->payload_size_bytes << " B)]";
275  } else {
276  LOG(WARNING)
277  << "Message size overflow. Dropping suspected invalid sync. "
278  "[size="
279  << current_message_size_
280  << " B (payload=" << header->payload_size_bytes << " B)]";
281  }
282 
283  state_ = State::SYNC0;
284  return -1;
285  }
286  // The reserved bytes in the header are currently always set to 0. If the
287  // incoming bytes are not zero, assume this an invalid sync.
288  //
289  // This may change in the future, but for now it prevents us from
290  // needing to collect a ton of bytes before performing a CRC check if a
291  // bogus header from an invalid sync has a very large payload size that
292  // happens to still be smaller than the buffer size.
293  else if (header->reserved[0] != 0 || header->reserved[1] != 0) {
294  if (quiet) {
295  VLOG(2) << "Reserved bytes nonzero. Dropping suspected invalid sync. "
296  "[size="
297  << current_message_size_
298  << " B (payload=" << header->payload_size_bytes << " B)]";
299  } else {
300  LOG(WARNING) << "Reserved bytes nonzero. Dropping suspected invalid "
301  "sync. [size="
302  << current_message_size_
303  << " B (payload=" << header->payload_size_bytes
304  << " B)]";
305  }
306 
307  state_ = State::SYNC0;
308  return -1;
309  }
310  // If the message is too large to fit in the buffer, we cannot parse it.
311  //
312  // If this is an invalid sync, the parsed (invalid) payload length may
313  // exceed the buffer size and the invalid header will be dropped. If it
314  // does not exceed the buffer size, it'll get caught later during the CRC
315  // check.
316  else if (current_message_size_ > capacity_bytes_) {
317  if (quiet) {
318  VLOG(2) << "Message too large for buffer. [size="
319  << current_message_size_
320  << " B (payload=" << header->payload_size_bytes
321  << " B), buffer_capacity=" << capacity_bytes_
322  << " B (max_payload="
323  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
324  } else {
325  LOG(WARNING) << "Message too large for buffer. [size="
326  << current_message_size_
327  << " B (payload=" << header->payload_size_bytes
328  << " B), buffer_capacity=" << capacity_bytes_
329  << " B (max_payload="
330  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
331  }
332 
333  state_ = State::SYNC0;
334  return -1;
335  }
336  // Sanity checks passed. Start collecting the message payload next.
337  else {
338  // If there's no payload, do the CRC check now.
339  if (header->payload_size_bytes == 0) {
340  VLOG(3) << "Message has no payload. Checking CRC.";
341  crc_check_needed = true;
342  }
343  // Otherwise, collect the payload, then do the CRC check.
344  else {
345  state_ = State::DATA;
346  }
347  }
348  }
349  }
350  // Collect the message payload.
351  else if (state_ == State::DATA) {
352  VLOG(4) << "Received " << next_byte_index_ << "/" << current_message_size_
353  << " message bytes (" << next_byte_index_ - sizeof(MessageHeader)
354  << "/" << current_message_size_ - sizeof(MessageHeader)
355  << " payload bytes). [byte=" << HexPrintableInteger(byte) << "]";
356 
357  // If we received the full payload, check the CRC and dispatch it.
358  if (next_byte_index_ == current_message_size_) {
359  VLOG(3) << "Payload complete. Checking CRC.";
360  crc_check_needed = true;
361  }
362  }
363  // Illegal state.
364  else {
365  LOG(ERROR) << "Impossible parsing state.";
366  Reset();
367  return -1;
368  }
369 
370  // Payload complete (or message has no payload). Check the CRC.
371  if (crc_check_needed) {
372  uint32_t crc = CalculateCRC(buffer_);
373  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
374  if (crc == header->crc) {
375  VLOG(1) << "CRC passed. Dispatching message. [message="
376  << header->message_type << " (" << (unsigned)header->message_type
377  << "), seq=" << header->sequence_number
378  << ", size=" << current_message_size_
379  << " B, crc=" << HexPrintableInteger(crc) << "]";
380  auto* payload = reinterpret_cast<uint8_t*>(header + 1);
381 #if P1_HAVE_STD_FUNCTION
382  if (callback_) {
383  callback_(*header, payload);
384  }
385 #endif // P1_HAVE_STD_FUNCTION
386  if (raw_callback_) {
387  raw_callback_(raw_callback_context_, *header, payload);
388  }
389  state_ = State::SYNC0;
390  return static_cast<int32_t>(current_message_size_);
391  } else {
392  if (quiet) {
393  VLOG(2) << "CRC check failed. [message=" << header->message_type << " ("
394  << (unsigned)header->message_type
395  << "), seq=" << header->sequence_number
396  << ", size=" << current_message_size_
397  << " B, crc=" << HexPrintableInteger(crc)
398  << ", expected_crc=" << HexPrintableInteger(header->crc) << "]";
399  } else {
400  LOG(WARNING) << "CRC check failed. [message=" << header->message_type
401  << " (" << (unsigned)header->message_type
402  << "), seq=" << header->sequence_number
403  << ", size=" << current_message_size_
404  << " B, crc=" << HexPrintableInteger(crc)
405  << ", expected_crc=" << HexPrintableInteger(header->crc)
406  << "]";
407  }
408  state_ = State::SYNC0;
409  return -1;
410  }
411  }
412 
413  // No messages completed.
414  return 0;
415 }
416 
417 /******************************************************************************/
418 uint32_t FusionEngineFramer::Resync() {
419  // If the message preamble shows up randomly somewhere in the data stream, we
420  // may sync to it and try to parse a message starting at that arbitrary
421  // location. We will eventually detect the bad sync either by CRC failure or
422  // because the payload size we extract from the bogus message header is too
423  // large.
424  //
425  // In either case, the bytes we collected - both the bogus header and the
426  // payload bytes based on the size in the header - may contain the start of a
427  // valid message, or even one or more complete messages. We need to resync and
428  // try to find those messages. Any of the following scenarios is possible:
429  // ...validvalidval <-- Contiguous valid messages
430  // ...valid...valid...val <-- Multiple valid messages, separated by invalid
431  // ...valid...valid... <-- Similar, but ending with invalid
432  // ...val <-- Start of a valid message, no complete messages
433  // ... <-- No valid content
434  //
435  // Ideally, we would search for these messages in-place and avoid shifting the
436  // data around in the buffer. However, since we need the messages to be
437  // 4B-aligned and there's only a 1-in-4 chance of that happening naturally,
438  // we'd have to shift the data 75% of the time regardless.
439  //
440  // Given that, we simply shift all data left in the buffer and process one
441  // message at a time. This is not as efficient, but it's the simplest option.
442  uint32_t available_bytes = next_byte_index_;
443  VLOG(1) << "Attempting resynchronization. [" << available_bytes - 1
444  << " candidate bytes]";
445  uint32_t total_message_size = 0;
446  state_ = State::SYNC0;
447  next_byte_index_ = 0;
448  for (uint32_t offset = 1; offset < available_bytes; ++offset) {
449  uint8_t current_byte = buffer_[offset];
450 
451  // Skip forward until we see a SYNC0.
452  if (state_ == State::SYNC0) {
453  if (current_byte == MessageHeader::SYNC0) {
454  VLOG(1) << "Candidate message start found @ offset " << offset << "/"
455  << available_bytes << ".";
456  // Shift all of the data left in the buffer.
457  available_bytes -= offset;
458  std::memmove(buffer_, buffer_ + offset, available_bytes);
459  offset = 0;
460  } else {
461  VLOG(4) << "Skipping non-sync byte 0 @ offset " << offset << "/"
462  << available_bytes
463  << ". [byte=" << HexPrintableInteger(current_byte) << "]";
464  continue;
465  }
466  }
467 
468  // Process this byte. If we end up back in the SYNC0 state, either A) the
469  // SYNC0 we found was a valid message and got dispatched, or B) was not the
470  // start of a valid message.
471  //
472  // In (A), message_size > 0 indicating there was a valid message, so we know
473  // we can just keep going with the rest of the data in the buffer.
474  //
475  // In (B), message_size == 0. In that case we'll rewind back to the byte
476  // just after we located the SYNC0 and see if there's another one.
477  //
478  // Note that next_byte_index_ always points to the next open slot, i.e.,
479  // one byte _after_ the current byte.
480  next_byte_index_ = offset + 1;
481  int32_t message_size = OnByte(true);
482 
483  if (state_ == State::SYNC0) {
484  // Note that offset will be incremented when we loop around, so we set it
485  // to N-1, where N is wherever we want to start searching next.
486  if (message_size > 0) {
487  total_message_size += message_size;
488  offset = message_size - 1;
489  VLOG(1)
490  << "Resync found a complete message. Continuing search @ offset "
491  << offset + 1 << "/" << available_bytes
492  << ". [message_size=" << message_size << ", "
493  << (available_bytes - message_size - 1)
494  << " candidate bytes remaining]";
495  } else {
496  size_t prev_offset = offset;
497  offset = 0;
498  VLOG(1) << "Candidate message rejected after " << prev_offset
499  << " bytes. Restarting search @ offset " << offset + 1 << "/"
500  << available_bytes << ". [" << available_bytes - 1
501  << " candidate bytes remaining]";
502  }
503 
504  next_byte_index_ = 0;
505  }
506  }
507 
508  VLOG(1) << "Resynchronization finished. " << next_byte_index_
509  << " bytes remaining in buffer.";
510 
511  return total_message_size;
512 }
513 
514 /******************************************************************************/
515 void FusionEngineFramer::ClearManagedBuffer() {
516  if (is_buffer_managed_ && buffer_ != nullptr) {
517  delete[] buffer_;
518  is_buffer_managed_ = false;
519  buffer_ = nullptr;
520  }
521 }
static constexpr uint8_t SYNC0
Definition: defs.h:609
uint32_t CalculateCRC(const void *buffer, size_t length, uint32_t initial_value)
Calculate the CRC for the message (payload) contained in the buffer.
Definition: crc.cc:45
The header present at the beginning of every message.
Definition: defs.h:608
#define VLOG(verboselevel)
Definition: logging.h:88
Message CRC support.
HexPrintableIntegerInst< T > HexPrintableInteger(T value)
Wrap an integer so it will be output to a stream as its hex representation.
Definition: configuration.h:14
static constexpr uint8_t SYNC1
Definition: defs.h:610
FusionEngine message framer.
HexPrintableIntegerInst(T value)
std::ostream p1_ostream
Definition: portability.h:75
uint32_t payload_size_bytes
The size of the serialized message (bytes).
Definition: defs.h:649
p1_ostream & operator<<(p1_ostream &stream, ConfigType type)
ConfigType stream operator.
friend p1_ostream & operator<<(p1_ostream &, const HexPrintableIntegerInst< U > &)
const T value_
#define LOG(severity)
Definition: logging.h:77
API wrapper for optional compilation of logging support.