fusion_engine_framer.cc
Go to the documentation of this file.
1 /**************************************************************************/ /**
2  * @brief FusionEngine message framer.
3  * @file
4  ******************************************************************************/
5 
6 #define P1_VMODULE_NAME fusion_engine_framer
7 
9 
10 #include <cstring> // For memmove()
11 #if P1_HAVE_STD_OSTREAM
12 # include <iomanip>
13 # include <ostream>
14 # include <type_traits>
15 #endif
16 
19 
21 using namespace point_one::fusion_engine::parsers;
22 
23 /******************************************************************************/
24 template <typename T>
26  public:
27  HexPrintableIntegerInst(T value) : value_(value) {}
28 
29  template <typename U> // all instantiations of this template are my friends
31 
32  private:
33  const T value_;
34 };
35 
36 /******************************************************************************/
37 template <typename T>
39  const HexPrintableIntegerInst<T>& obj) {
40 #if P1_HAVE_STD_OSTREAM
41  static_assert(std::is_integral<T>::value, "Integer required.");
42 
43  stream << "0x" << std::hex << std::setfill('0') << std::setw(sizeof(obj) * 2);
44 
45  if (sizeof(T) == 1) {
46  stream << (((unsigned)obj.value_) & 0xFF);
47  } else {
48  stream << obj.value_;
49  }
50 
51  stream << std::dec;
52 
53  if (sizeof(obj) == 1) {
54  if (obj.value_ >= 0x20 && obj.value_ <= 0x7E) {
55  stream << " ('" << (char)obj.value_ << "')";
56  } else {
57  stream << " (---)";
58  }
59  }
60 #endif
61  return stream;
62 }
63 
64 /**
65  * @brief Wrap an integer so it will be output to a stream as its hex
66  * representation.
67  *
68  * For example:
69  *
70  * ```cpp
71  * std::cout << HexPrintableValue((int16_t)-255) << std::endl;
72  * std::cout << HexPrintableValue((uint32_t)255) << std::endl;
73  * std::cout << HexPrintableValue((uint8_t)48) << std::endl;
74  * ```
75  *
76  * generates the following output:
77  *
78  * ```
79  * 0xff01
80  * 0x000000ff
81  * 0x30 ('0')
82  * ```
83  *
84  * @tparam T The type of the value parameter (inferred implicitly).
85  * @param value The integer value to wrap.
86  *
87  * @return The wrapped integer that can be used in an @ref p1_ostream.
88  */
89 template <typename T>
91  return HexPrintableIntegerInst<T>(value);
92 }
93 
94 /******************************************************************************/
95 FusionEngineFramer::FusionEngineFramer(void* buffer, size_t capacity_bytes) {
96  // Allocate a buffer internally.
97  if (buffer == nullptr) {
98  // We enforce 4B alignment, so we need to allocate 3 extra bytes to
99  // guarantee that the buffer has at least the requested capacity.
100  SetBuffer(nullptr, capacity_bytes + 3);
101  }
102  // Use user-provided storage.
103  else {
104  SetBuffer(buffer, capacity_bytes);
105  }
106 }
107 
108 /******************************************************************************/
109 FusionEngineFramer::~FusionEngineFramer() { ClearManagedBuffer(); }
110 
111 /******************************************************************************/
112 void FusionEngineFramer::SetBuffer(void* buffer, size_t capacity_bytes) {
113  if (capacity_bytes < sizeof(MessageHeader)) {
114  LOG(ERROR) << "FusionEngine framing buffer too small. [capacity="
115  << capacity_bytes << " B, min=" << sizeof(MessageHeader)
116  << " B]";
117  return;
118  }
119  // Restrict the buffer capacity to 2^31 bytes. We don't expect to ever have a
120  // single message anywhere near that large, and don't expect users to ever
121  // pass in a buffer that size. Using uint32_t instead of size_t internally
122  // makes it easier to guarantee behavior between 64b and 32b architectures. We
123  // do 2^31, not 2^32, so we can use int32_t for return values internally.
124  else if (capacity_bytes > 0x7FFFFFFF) {
125  LOG(WARNING) << "Limiting buffer capacity to 2^31 B. [original_capacity="
126  << capacity_bytes << " B]";
127  capacity_bytes = 0x7FFFFFFF;
128  }
129 
130  ClearManagedBuffer();
131  if (buffer == nullptr) {
132  buffer = new uint8_t[capacity_bytes];
133  is_buffer_managed_ = true;
134  }
135 
136  // Enforce 4B alignment at the beginning of the buffer.
137  uint8_t* buffer_unaligned = static_cast<uint8_t*>(buffer);
138  buffer_ = reinterpret_cast<uint8_t*>(
139  (reinterpret_cast<size_t>(buffer_unaligned) + 3) &
140  ~(static_cast<size_t>(3)));
141  capacity_bytes_ =
142  static_cast<uint32_t>(capacity_bytes - (buffer_ - buffer_unaligned));
143 
144  Reset();
145 }
146 
147 /******************************************************************************/
148 void FusionEngineFramer::Reset() {
149  state_ = State::SYNC0;
150  next_byte_index_ = 0;
151  current_message_size_ = 0;
152 }
153 
154 /******************************************************************************/
155 size_t FusionEngineFramer::OnData(const uint8_t* buffer, size_t length_bytes) {
156  // Process each byte. If the user-supplied buffer was too small, we can't
157  // parse messages.
158  if (buffer_ != nullptr) {
159  VLOG(2) << "Received " << length_bytes << " bytes.";
160  size_t total_dispatched_bytes = 0;
161  for (size_t idx = 0; idx < length_bytes; ++idx) {
162  uint8_t byte = buffer[idx];
163  buffer_[next_byte_index_++] = byte;
164  int32_t dispatched_message_size = OnByte(false);
165  if (dispatched_message_size == 0) {
166  // Waiting for more data. Nothing to do.
167  } else if (dispatched_message_size > 0) {
168  // Message framed successfully. Reset for the next one.
169  next_byte_index_ = 0;
170  total_dispatched_bytes += (size_t)dispatched_message_size;
171  } else if (next_byte_index_ > 0) {
172  // If OnByte() indicated an error (size < 0) and there is still data in
173  // the buffer, either the CRC failed or the payload was too big to fit
174  // in the buffer.
175  //
176  // In either case, it is possible we found what looked like the preamble
177  // somewhere within the data stream but it wasn't actually a valid
178  // message. In that case, the data we processed may contain the start of
179  // a valid message, or even one or more complete messages, starting
180  // somewhere after byte 0. Perform a resync operation to find valid
181  // messages.
182  total_dispatched_bytes += Resync();
183  } else {
184  // OnByte() caught an unrecoverable error and reset the buffer. Nothing
185  // to do.
186  }
187  }
188  return total_dispatched_bytes;
189  } else {
190  return 0;
191  }
192 }
193 
194 /******************************************************************************/
195 int32_t FusionEngineFramer::OnByte(bool quiet) {
196  // User-supplied buffer was too small. Can't parse messages.
197  if (buffer_ == nullptr) {
198  return 0;
199  }
200 
201  // If warnings are disabled, run in quiet mode.
202  if (!warn_on_error_) {
203  quiet = true;
204  }
205 
206  // Pull out the byte being processed.
207  if (next_byte_index_ == 0) {
208  LOG(ERROR) << "Byte not found in buffer.";
209  return 0;
210  }
211 
212  uint8_t byte = buffer_[next_byte_index_ - 1];
213 
214  // Look for the first sync byte.
215  //
216  // Note that we always put the first byte at offset 0 in the buffer, and the
217  // buffer is guaranteed to be 4B-aligned by the constructor, so the framed
218  // message will always be 4B aligned.
219  bool crc_check_needed = false;
220  if (state_ == State::SYNC0) {
221  VLOG(4) << "Searching for sync byte 0. [byte=" << HexPrintableInteger(byte)
222  << "]";
223  if (byte == MessageHeader::SYNC0) {
224  VLOG(4) << "Found sync byte 0.";
225  state_ = State::SYNC1;
226  } else {
227  --next_byte_index_;
228  }
229  }
230  // Look for the second sync byte.
231  else if (state_ == State::SYNC1) {
232  VLOG(4) << "Searching for sync byte 1. [byte=" << HexPrintableInteger(byte)
233  << "]";
234  if (byte == MessageHeader::SYNC0) {
235  VLOG(4) << "Found duplicate sync byte 0.";
236  state_ = State::SYNC1;
237  --next_byte_index_;
238  } else if (byte == MessageHeader::SYNC1) {
239  VLOG(3) << "Preamble found. Waiting for header.";
240  state_ = State::HEADER;
241  } else {
242  VLOG(4) << "Did not find sync byte 1. Resetting. [byte="
243  << HexPrintableInteger(byte) << "]";
244  state_ = State::SYNC0;
245  next_byte_index_ = 0;
246  current_message_size_ = 0;
247  }
248  }
249  // Search for a message header.
250  else if (state_ == State::HEADER) {
251  VLOG(4) << "Received " << next_byte_index_ << "/" << sizeof(MessageHeader)
252  << " header bytes. [byte=" << HexPrintableInteger(byte) << "]";
253 
254  // Check if the header is complete.
255  if (next_byte_index_ == sizeof(MessageHeader)) {
256  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
257  current_message_size_ =
258  sizeof(MessageHeader) + header->payload_size_bytes;
259  VLOG(3) << "Header complete. Waiting for payload. [message="
260  << header->message_type << ", seq=" << header->sequence_number
261  << ", payload_size=" << header->payload_size_bytes
262  << " B, message_size=" << current_message_size_ << " B]";
263 
264  // Check for overflow of the messge size uint32_t variable. We don't
265  // currently expect to have _extremely_ large packets, so this should
266  // never happen for a valid message. If it does, assume this is not a
267  // valid message, and instead the sync pattern likely showed up at random
268  // in the data stream.
269  if (current_message_size_ < header->payload_size_bytes) {
270  if (quiet) {
271  VLOG(2) << "Message size overflow. Dropping suspected invalid sync. "
272  "[size="
273  << current_message_size_
274  << " B (payload=" << header->payload_size_bytes << " B)]";
275  } else {
276  LOG(WARNING)
277  << "Message size overflow. Dropping suspected invalid sync. "
278  "[size="
279  << current_message_size_
280  << " B (payload=" << header->payload_size_bytes << " B)]";
281  }
282 
283  state_ = State::SYNC0;
284  return -1;
285  }
286  // The reserved bytes in the header are currently always set to 0. If the
287  // incoming bytes are not zero, assume this an invalid sync.
288  //
289  // This may change in the future, but for now it prevents us from
290  // needing to collect a ton of bytes before performing a CRC check if a
291  // bogus header from an invalid sync has a very large payload size that
292  // happens to still be smaller than the buffer size.
293  else if (header->reserved[0] != 0 || header->reserved[1] != 0) {
294  if (quiet) {
295  VLOG(2) << "Reserved bytes nonzero. Dropping suspected invalid sync. "
296  "[size="
297  << current_message_size_
298  << " B (payload=" << header->payload_size_bytes << " B)]";
299  } else {
300  LOG(WARNING) << "Reserved bytes nonzero. Dropping suspected invalid "
301  "sync. [size="
302  << current_message_size_
303  << " B (payload=" << header->payload_size_bytes
304  << " B)]";
305  }
306 
307  state_ = State::SYNC0;
308  return -1;
309  }
310  // If the message is too large to fit in the buffer, we cannot parse it.
311  //
312  // If this is an invalid sync, the parsed (invalid) payload length may
313  // exceed the buffer size and the invalid header will be dropped. If it
314  // does not exceed the buffer size, it'll get caught later during the CRC
315  // check.
316  else if (current_message_size_ > capacity_bytes_) {
317  if (quiet) {
318  VLOG(2) << "Message too large for buffer. [size="
319  << current_message_size_
320  << " B (payload=" << header->payload_size_bytes
321  << " B), buffer_capacity=" << capacity_bytes_
322  << " B (max_payload="
323  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
324  } else {
325  LOG(WARNING) << "Message too large for buffer. [size="
326  << current_message_size_
327  << " B (payload=" << header->payload_size_bytes
328  << " B), buffer_capacity=" << capacity_bytes_
329  << " B (max_payload="
330  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
331  }
332 
333  state_ = State::SYNC0;
334  return -1;
335  }
336  // Sanity checks passed. Start collecting the message payload next.
337  else {
338  // If there's no payload, do the CRC check now.
339  if (header->payload_size_bytes == 0) {
340  VLOG(3) << "Message has no payload. Checking CRC.";
341  crc_check_needed = true;
342  }
343  // Otherwise, collect the payload, then do the CRC check.
344  else {
345  state_ = State::DATA;
346  }
347  }
348  }
349  }
350  // Collect the message payload.
351  else if (state_ == State::DATA) {
352  VLOG(4) << "Received " << next_byte_index_ << "/" << current_message_size_
353  << " message bytes (" << next_byte_index_ - sizeof(MessageHeader)
354  << "/" << current_message_size_ - sizeof(MessageHeader)
355  << " payload bytes). [byte=" << HexPrintableInteger(byte) << "]";
356 
357  // If we received the full payload, check the CRC and dispatch it.
358  if (next_byte_index_ == current_message_size_) {
359  VLOG(3) << "Payload complete. Checking CRC.";
360  crc_check_needed = true;
361  }
362  }
363  // Illegal state.
364  else {
365  LOG(ERROR) << "Impossible parsing state.";
366  Reset();
367  return -1;
368  }
369 
370  // Payload complete (or message has no payload). Check the CRC.
371  if (crc_check_needed) {
372  uint32_t crc = CalculateCRC(buffer_);
373  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
374  if (crc == header->crc) {
375  VLOG(1) << "CRC passed. Dispatching message. [message="
376  << header->message_type << " (" << (unsigned)header->message_type
377  << "), seq=" << header->sequence_number
378  << ", size=" << current_message_size_
379  << " B, crc=" << HexPrintableInteger(crc) << "]";
380  if (callback_) {
381  auto* payload = reinterpret_cast<uint8_t*>(header + 1);
382  callback_(*header, payload);
383  }
384  state_ = State::SYNC0;
385  return static_cast<int32_t>(current_message_size_);
386  } else {
387  if (quiet) {
388  VLOG(2) << "CRC check failed. [message=" << header->message_type << " ("
389  << (unsigned)header->message_type
390  << "), seq=" << header->sequence_number
391  << ", size=" << current_message_size_
392  << " B, crc=" << HexPrintableInteger(crc)
393  << ", expected_crc=" << HexPrintableInteger(header->crc) << "]";
394  } else {
395  LOG(WARNING) << "CRC check failed. [message=" << header->message_type
396  << " (" << (unsigned)header->message_type
397  << "), seq=" << header->sequence_number
398  << ", size=" << current_message_size_
399  << " B, crc=" << HexPrintableInteger(crc)
400  << ", expected_crc=" << HexPrintableInteger(header->crc)
401  << "]";
402  }
403  state_ = State::SYNC0;
404  return -1;
405  }
406  }
407 
408  // No messages completed.
409  return 0;
410 }
411 
412 /******************************************************************************/
413 uint32_t FusionEngineFramer::Resync() {
414  // If the message preamble shows up randomly somewhere in the data stream, we
415  // may sync to it and try to parse a message starting at that arbitrary
416  // location. We will eventually detect the bad sync either by CRC failure or
417  // because the payload size we extract from the bogus message header is too
418  // large.
419  //
420  // In either case, the bytes we collected - both the bogus header and the
421  // payload bytes based on the size in the header - may contain the start of a
422  // valid message, or even one or more complete messages. We need to resync and
423  // try to find those messages. Any of the following scenarios is possible:
424  // ...validvalidval <-- Contiguous valid messages
425  // ...valid...valid...val <-- Multiple valid messages, separated by invalid
426  // ...valid...valid... <-- Similar, but ending with invalid
427  // ...val <-- Start of a valid message, no complete messages
428  // ... <-- No valid content
429  //
430  // Ideally, we would search for these messages in-place and avoid shifting the
431  // data around in the buffer. However, since we need the messages to be
432  // 4B-aligned and there's only a 1-in-4 chance of that happening naturally,
433  // we'd have to shift the data 75% of the time regardless.
434  //
435  // Given that, we simply shift all data left in the buffer and process one
436  // message at a time. This is not as efficient, but it's the simplest option.
437  uint32_t available_bytes = next_byte_index_;
438  VLOG(1) << "Attempting resynchronization. [" << available_bytes - 1
439  << " candidate bytes]";
440  uint32_t total_message_size = 0;
441  state_ = State::SYNC0;
442  next_byte_index_ = 0;
443  for (uint32_t offset = 1; offset < available_bytes; ++offset) {
444  uint8_t current_byte = buffer_[offset];
445 
446  // Skip forward until we see a SYNC0.
447  if (state_ == State::SYNC0) {
448  if (current_byte == MessageHeader::SYNC0) {
449  VLOG(1) << "Candidate message start found @ offset " << offset << "/"
450  << available_bytes << ".";
451  // Shift all of the data left in the buffer.
452  available_bytes -= offset;
453  std::memmove(buffer_, buffer_ + offset, available_bytes);
454  offset = 0;
455  } else {
456  VLOG(4) << "Skipping non-sync byte 0 @ offset " << offset << "/"
457  << available_bytes
458  << ". [byte=" << HexPrintableInteger(current_byte) << "]";
459  continue;
460  }
461  }
462 
463  // Process this byte. If we end up back in the SYNC0 state, either A) the
464  // SYNC0 we found was a valid message and got dispatched, or B) was not the
465  // start of a valid message.
466  //
467  // In (A), message_size > 0 indicating there was a valid message, so we know
468  // we can just keep going with the rest of the data in the buffer.
469  //
470  // In (B), message_size == 0. In that case we'll rewind back to the byte
471  // just after we located the SYNC0 and see if there's another one.
472  //
473  // Note that next_byte_index_ always points to the next open slot, i.e.,
474  // one byte _after_ the current byte.
475  next_byte_index_ = offset + 1;
476  int32_t message_size = OnByte(true);
477 
478  if (state_ == State::SYNC0) {
479  // Note that offset will be incremented when we loop around, so we set it
480  // to N-1, where N is wherever we want to start searching next.
481  if (message_size > 0) {
482  total_message_size += message_size;
483  offset = message_size - 1;
484  VLOG(1)
485  << "Resync found a complete message. Continuing search @ offset "
486  << offset + 1 << "/" << available_bytes
487  << ". [message_size=" << message_size << ", "
488  << (available_bytes - message_size - 1)
489  << " candidate bytes remaining]";
490  } else {
491  size_t prev_offset = offset;
492  offset = 0;
493  VLOG(1) << "Candidate message rejected after " << prev_offset
494  << " bytes. Restarting search @ offset " << offset + 1 << "/"
495  << available_bytes << ". [" << available_bytes - 1
496  << " candidate bytes remaining]";
497  }
498 
499  next_byte_index_ = 0;
500  }
501  }
502 
503  VLOG(1) << "Resynchronization finished. " << next_byte_index_
504  << " bytes remaining in buffer.";
505 
506  return total_message_size;
507 }
508 
509 /******************************************************************************/
510 void FusionEngineFramer::ClearManagedBuffer() {
511  if (is_buffer_managed_ && buffer_ != nullptr) {
512  delete[] buffer_;
513  is_buffer_managed_ = false;
514  buffer_ = nullptr;
515  }
516 }
static constexpr uint8_t SYNC0
Definition: defs.h:532
uint32_t CalculateCRC(const void *buffer, size_t length, uint32_t initial_value)
Calculate the CRC for the message (payload) contained in the buffer.
Definition: crc.cc:45
The header present at the beginning of every message.
Definition: defs.h:531
#define VLOG(verboselevel)
Definition: logging.h:88
Message CRC support.
HexPrintableIntegerInst< T > HexPrintableInteger(T value)
Wrap an integer so it will be output to a stream as its hex representation.
Definition: configuration.h:14
static constexpr uint8_t SYNC1
Definition: defs.h:533
FusionEngine message framer.
HexPrintableIntegerInst(T value)
std::ostream p1_ostream
Definition: portability.h:75
p1_ostream & operator<<(p1_ostream &stream, ConfigType type)
ConfigType stream operator.
uint32_t payload_size_bytes
The size of the serialized message (bytes).
Definition: defs.h:572
friend p1_ostream & operator<<(p1_ostream &, const HexPrintableIntegerInst< U > &)
const T value_
#define LOG(severity)
Definition: logging.h:77
API wrapper for optional compilation of logging support.