fusion_engine_framer.cc
Go to the documentation of this file.
1 /**************************************************************************/ /**
2  * @brief FusionEngine message framer.
3  * @file
4  ******************************************************************************/
5 
6 #define P1_VMODULE_NAME fusion_engine_framer
7 
9 
10 #include <cstring> // For memmove()
11 #include <iomanip>
12 #include <ostream>
13 
16 
18 using namespace point_one::fusion_engine::parsers;
19 
20 /******************************************************************************/
22  public:
23  uint8_t byte_;
24 
25  PrintableByte(uint8_t byte) : byte_(byte) {}
26 
27  friend std::ostream& operator<<(std::ostream& stream,
28  const PrintableByte& obj) {
29  stream << "0x" << std::hex << std::setfill('0') << std::setw(2)
30  << (unsigned)obj.byte_ << std::dec;
31  if (obj.byte_ >= 0x20 && obj.byte_ <= 0x7E) {
32  stream << " ('" << (char)obj.byte_ << "')";
33  } else {
34  stream << " (---)";
35  }
36  return stream;
37  }
38 };
39 
40 /******************************************************************************/
41 FusionEngineFramer::FusionEngineFramer(void* buffer, size_t capacity_bytes) {
42  // Allocate a buffer internally.
43  if (buffer == nullptr) {
44  // We enforce 4B alignment, so we need to allocate 3 extra bytes to
45  // guarantee that the buffer has at least the requested capacity.
46  SetBuffer(nullptr, capacity_bytes + 3);
47  }
48  // Use user-provided storage.
49  else {
50  SetBuffer(buffer, capacity_bytes);
51  }
52 }
53 
54 /******************************************************************************/
55 void FusionEngineFramer::SetBuffer(void* buffer, size_t capacity_bytes) {
56  if (capacity_bytes < sizeof(MessageHeader)) {
57  LOG(ERROR) << "FusionEngine framing buffer too small. [capacity="
58  << capacity_bytes << " B, min=" << sizeof(MessageHeader)
59  << " B]";
60  return;
61  }
62  // Restrict the buffer capacity to 2^31 bytes. We don't expect to ever have a
63  // single message anywhere near that large, and don't expect users to ever
64  // pass in a buffer that size. Using uint32_t instead of size_t internally
65  // makes it easier to guarantee behavior between 64b and 32b architectures. We
66  // do 2^31, not 2^32, so we can use int32_t for return values internally.
67  else if (capacity_bytes > 0x7FFFFFFF) {
68  LOG(WARNING) << "Limiting buffer capacity to 2^31 B. [original_capacity="
69  << capacity_bytes << " B]";
70  capacity_bytes = 0x7FFFFFFF;
71  }
72 
73  if (buffer == nullptr) {
74  managed_buffer_.reset(new uint8_t[capacity_bytes]);
75  buffer = managed_buffer_.get();
76  } else if (buffer != managed_buffer_.get()) {
77  managed_buffer_.reset(nullptr);
78  }
79 
80  // Enforce 4B alignment at the beginning of the buffer.
81  uint8_t* buffer_unaligned = static_cast<uint8_t*>(buffer);
82  buffer_ = reinterpret_cast<uint8_t*>(
83  (reinterpret_cast<size_t>(buffer_unaligned) + 3) &
84  ~(static_cast<size_t>(3)));
85  capacity_bytes_ =
86  static_cast<uint32_t>(capacity_bytes - (buffer_ - buffer_unaligned));
87 
88  Reset();
89 }
90 
91 /******************************************************************************/
92 void FusionEngineFramer::Reset() {
93  state_ = State::SYNC0;
94  next_byte_index_ = 0;
95  current_message_size_ = 0;
96 }
97 
98 /******************************************************************************/
99 size_t FusionEngineFramer::OnData(const uint8_t* buffer, size_t length_bytes) {
100  // Process each byte. If the user-supplied buffer was too small, we can't
101  // parse messages.
102  if (buffer_ != nullptr) {
103  VLOG(2) << "Received " << length_bytes << " bytes.";
104  size_t total_dispatched_bytes = 0;
105  for (size_t idx = 0; idx < length_bytes; ++idx) {
106  uint8_t byte = buffer[idx];
107  buffer_[next_byte_index_++] = byte;
108  int32_t dispatched_message_size = OnByte(false);
109  if (dispatched_message_size == 0) {
110  // Waiting for more data. Nothing to do.
111  } else if (dispatched_message_size > 0) {
112  // Message framed successfully. Reset for the next one.
113  next_byte_index_ = 0;
114  total_dispatched_bytes += (size_t)dispatched_message_size;
115  } else if (next_byte_index_ > 0) {
116  // If OnByte() indicated an error (size < 0) and there is still data in
117  // the buffer, either the CRC failed or the payload was too big to fit
118  // in the buffer.
119  //
120  // In either case, it is possible we found what looked like the preamble
121  // somewhere within the data stream but it wasn't actually a valid
122  // message. In that case, the data we processed may contain the start of
123  // a valid message, or even one or more complete messages, starting
124  // somewhere after byte 0. Perform a resync operation to find valid
125  // messages.
126  total_dispatched_bytes += Resync();
127  } else {
128  // OnByte() caught an unrecoverable error and reset the buffer. Nothing
129  // to do.
130  }
131  }
132  return total_dispatched_bytes;
133  } else {
134  return 0;
135  }
136 }
137 
138 /******************************************************************************/
139 int32_t FusionEngineFramer::OnByte(bool quiet) {
140  // User-supplied buffer was too small. Can't parse messages.
141  if (buffer_ == nullptr) {
142  return 0;
143  }
144 
145  // If warnings are disabled, run in quiet mode.
146  if (!warn_on_error_) {
147  quiet = true;
148  }
149 
150  // Pull out the byte being processed.
151  if (next_byte_index_ == 0) {
152  LOG(ERROR) << "Byte not found in buffer.";
153  return 0;
154  }
155 
156  uint8_t byte = buffer_[next_byte_index_ - 1];
157 
158  // Look for the first sync byte.
159  //
160  // Note that we always put the first byte at offset 0 in the buffer, and the
161  // buffer is guaranteed to be 4B-aligned by the constructor, so the framed
162  // message will always be 4B aligned.
163  bool crc_check_needed = false;
164  if (state_ == State::SYNC0) {
165  VLOG(4) << "Searching for sync byte 0. [byte=" << PrintableByte(byte)
166  << "]";
167  if (byte == MessageHeader::SYNC0) {
168  VLOG(4) << "Found sync byte 0.";
169  state_ = State::SYNC1;
170  } else {
171  --next_byte_index_;
172  }
173  }
174  // Look for the second sync byte.
175  else if (state_ == State::SYNC1) {
176  VLOG(4) << "Searching for sync byte 1. [byte=" << PrintableByte(byte)
177  << "]";
178  if (byte == MessageHeader::SYNC0) {
179  VLOG(4) << "Found duplicate sync byte 0.";
180  state_ = State::SYNC1;
181  --next_byte_index_;
182  } else if (byte == MessageHeader::SYNC1) {
183  VLOG(3) << "Preamble found. Waiting for header.";
184  state_ = State::HEADER;
185  } else {
186  VLOG(4) << "Did not find sync byte 1. Resetting. [byte="
187  << PrintableByte(byte) << "]";
188  state_ = State::SYNC0;
189  next_byte_index_ = 0;
190  current_message_size_ = 0;
191  }
192  }
193  // Search for a message header.
194  else if (state_ == State::HEADER) {
195  VLOG(4) << "Received " << next_byte_index_ << "/" << sizeof(MessageHeader)
196  << " header bytes. [byte=" << PrintableByte(byte) << "]";
197 
198  // Check if the header is complete.
199  if (next_byte_index_ == sizeof(MessageHeader)) {
200  // Compute the full message size. If the message is too large to fit in
201  // the buffer, we cannot parse it. Otherwise, start collecting the
202  // message payload.
203  //
204  // Note that while we compute the current_message_size_ here, we
205  // intentionally do the "too big" check below with the payload size. That
206  // way we implicitly handle cases where the payload is large enough to
207  // cause current_message_size_ to overflow. Normally, this won't happen
208  // for legit packets that are just too big for the user's buffer, but it
209  // could happen on a bogus header if we find the preamble randomly in an
210  // incoming byte stream. The buffer capacity is always
211  // >=sizeof(MessageHeader), so the subtraction will never be negative.
212  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
213  current_message_size_ =
214  sizeof(MessageHeader) + header->payload_size_bytes;
215  VLOG(3) << "Header complete. Waiting for payload. [message="
216  << header->message_type << " (" << (unsigned)header->message_type
217  << "), seq=" << header->sequence_number
218  << ", payload_size=" << header->payload_size_bytes << " B]";
219  if (header->payload_size_bytes <=
220  capacity_bytes_ - sizeof(MessageHeader)) {
221  // If there's no payload, do the CRC check now.
222  if (header->payload_size_bytes == 0) {
223  VLOG(3) << "Message has no payload. Checking CRC.";
224  crc_check_needed = true;
225  }
226  // Otherwise, collect the payload, then do the CRC check.
227  else {
228  state_ = State::DATA;
229  }
230  } else {
231  if (quiet) {
232  VLOG(2) << "Message too large for buffer. [size="
233  << current_message_size_
234  << " B (payload=" << header->payload_size_bytes
235  << " B), buffer_capacity=" << capacity_bytes_
236  << " B (max_payload="
237  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
238  } else {
239  LOG(WARNING) << "Message too large for buffer. [size="
240  << current_message_size_
241  << " B (payload=" << header->payload_size_bytes
242  << " B), buffer_capacity=" << capacity_bytes_
243  << " B (max_payload="
244  << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
245  }
246 
247  state_ = State::SYNC0;
248  return -1;
249  }
250  }
251  }
252  // Collect the message payload.
253  else if (state_ == State::DATA) {
254  VLOG(4) << "Received " << next_byte_index_ << "/" << current_message_size_
255  << " message bytes (" << next_byte_index_ - sizeof(MessageHeader)
256  << "/" << current_message_size_ - sizeof(MessageHeader)
257  << " payload bytes). [byte=" << PrintableByte(byte) << "]";
258 
259  // If we received the full payload, check the CRC and dispatch it.
260  if (next_byte_index_ == current_message_size_) {
261  VLOG(3) << "Payload complete. Checking CRC.";
262  crc_check_needed = true;
263  }
264  }
265  // Illegal state.
266  else {
267  LOG(ERROR) << "Impossible parsing state.";
268  Reset();
269  return -1;
270  }
271 
272  // Payload complete (or message has no payload). Check the CRC.
273  if (crc_check_needed) {
274  uint32_t crc = CalculateCRC(buffer_);
275  auto* header = reinterpret_cast<MessageHeader*>(buffer_);
276  if (crc == header->crc) {
277  VLOG(1) << "CRC passed. Dispatching message. [message="
278  << header->message_type << " (" << (unsigned)header->message_type
279  << "), seq=" << header->sequence_number
280  << ", size=" << current_message_size_ << " B, crc=0x" << std::hex
281  << std::setfill('0') << std::setw(8) << crc << "]";
282  if (callback_) {
283  auto* payload = reinterpret_cast<uint8_t*>(header + 1);
284  callback_(*header, payload);
285  }
286  state_ = State::SYNC0;
287  return static_cast<int32_t>(current_message_size_);
288  } else {
289  if (quiet) {
290  VLOG(2) << "CRC check failed. [message=" << header->message_type << " ("
291  << (unsigned)header->message_type
292  << "), seq=" << header->sequence_number
293  << ", size=" << current_message_size_ << " B, crc=0x"
294  << std::hex << std::setfill('0') << std::setw(8) << crc
295  << ", expected_crc=0x" << std::setw(8) << header->crc << "]";
296  } else {
297  LOG(WARNING) << "CRC check failed. [message=" << header->message_type
298  << " (" << (unsigned)header->message_type
299  << "), seq=" << header->sequence_number
300  << ", size=" << current_message_size_ << " B, crc=0x"
301  << std::hex << std::setfill('0') << std::setw(8) << crc
302  << ", expected_crc=0x" << std::setw(8) << header->crc
303  << "]";
304  }
305  state_ = State::SYNC0;
306  return -1;
307  }
308  }
309 
310  // No messages completed.
311  return 0;
312 }
313 
314 /******************************************************************************/
315 uint32_t FusionEngineFramer::Resync() {
316  // If the message preamble shows up randomly somewhere in the data stream, we
317  // may sync to it and try to parse a message starting at that arbitrary
318  // location. We will eventually detect the bad sync either by CRC failure or
319  // because the payload size we extract from the bogus message header is too
320  // large.
321  //
322  // In either case, the bytes we collected - both the bogus header and the
323  // payload bytes based on the size in the header - may contain the start of a
324  // valid message, or even one or more complete messages. We need to resync and
325  // try to find those messages. Any of the following scenarios is possible:
326  // ...validvalidval <-- Contiguous valid messages
327  // ...valid...valid...val <-- Multiple valid messages, separated by invalid
328  // ...valid...valid... <-- Similar, but ending with invalid
329  // ...val <-- Start of a valid message, no complete messages
330  // ... <-- No valid content
331  //
332  // Ideally, we would search for these messages in-place and avoid shifting the
333  // data around in the buffer. However, since we need the messages to be
334  // 4B-aligned and there's only a 1-in-4 chance of that happening naturally,
335  // we'd have to shift the data 75% of the time regardless.
336  //
337  // Given that, we simply shift all data left in the buffer and process one
338  // message at a time. This is not as efficient, but it's the simplest option.
339  uint32_t available_bytes = next_byte_index_;
340  VLOG(1) << "Attempting resynchronization. [" << available_bytes - 1
341  << " candidate bytes]";
342  uint32_t total_message_size = 0;
343  state_ = State::SYNC0;
344  next_byte_index_ = 0;
345  for (uint32_t offset = 1; offset < available_bytes; ++offset) {
346  uint8_t current_byte = buffer_[offset];
347 
348  // Skip forward until we see a SYNC0.
349  if (state_ == State::SYNC0) {
350  if (current_byte == MessageHeader::SYNC0) {
351  VLOG(1) << "Candidate message start found @ offset " << offset << "/"
352  << available_bytes << ".";
353  // Shift all of the data left in the buffer.
354  available_bytes -= offset;
355  std::memmove(buffer_, buffer_ + offset, available_bytes);
356  offset = 0;
357  } else {
358  VLOG(4) << "Skipping non-sync byte 0 @ offset " << offset << "/"
359  << available_bytes << ". [byte=" << PrintableByte(current_byte)
360  << "]";
361  continue;
362  }
363  }
364 
365  // Process this byte. If we end up back in the SYNC0 state, either A) the
366  // SYNC0 we found was a valid message and got dispatched, or B) was not the
367  // start of a valid message.
368  //
369  // In (A), message_size > 0 indicating there was a valid message, so we know
370  // we can just keep going with the rest of the data in the buffer.
371  //
372  // In (B), message_size == 0. In that case we'll rewind back to the byte
373  // just after we located the SYNC0 and see if there's another one.
374  //
375  // Note that next_byte_index_ always points to the next open slot, i.e.,
376  // one byte _after_ the current byte.
377  next_byte_index_ = offset + 1;
378  int32_t message_size = OnByte(true);
379 
380  if (state_ == State::SYNC0) {
381  // Note that offset will be incremented when we loop around, so we set it
382  // to N-1, where N is wherever we want to start searching next.
383  if (message_size > 0) {
384  total_message_size += message_size;
385  offset = message_size - 1;
386  VLOG(1)
387  << "Resync found a complete message. Continuing search @ offset "
388  << offset + 1 << "/" << available_bytes
389  << ". [message_size=" << message_size << ", "
390  << (available_bytes - message_size - 1)
391  << " candidate bytes remaining]";
392  } else {
393  size_t prev_offset = offset;
394  offset = 0;
395  VLOG(1) << "Candidate message rejected after " << prev_offset
396  << " bytes. Restarting search @ offset " << offset + 1 << "/"
397  << available_bytes << ". [" << available_bytes - 1
398  << " candidate bytes remaining]";
399  }
400 
401  next_byte_index_ = 0;
402  }
403  }
404 
405  VLOG(1) << "Resynchronization finished. " << next_byte_index_
406  << " bytes remaining in buffer.";
407 
408  return total_message_size;
409 }
static constexpr uint8_t SYNC0
Definition: defs.h:447
The header present at the beginning of every message.
Definition: defs.h:446
#define VLOG(verboselevel)
Definition: logging.h:82
Message CRC support.
uint32_t CalculateCRC(const void *buffer)
Calculate the CRC for the message (header + payload) contained in the buffer.
Definition: crc.cc:57
PrintableByte(uint8_t byte)
Definition: configuration.h:23
static constexpr uint8_t SYNC1
Definition: defs.h:448
FusionEngine message framer.
friend std::ostream & operator<<(std::ostream &stream, const PrintableByte &obj)
uint8_t byte_
uint32_t payload_size_bytes
The size of the serialized message (bytes).
Definition: defs.h:487
#define LOG(severity)
Definition: logging.h:71
API wrapper for optional compilation of logging support.