Skip to main content

FusionEngineFramer Class

Frame and validate incoming FusionEngine messages. More...

Declaration

class point_one::fusion_engine::parsers::FusionEngineFramer { ... }

Included Headers

Public Member Typedefs Index

usingRawMessageCallback = void(*)(void *context, const messages::MessageHeader &header, const void *payload)

Enumerations Index

enum classState { ... }

Public Constructors Index

P1_EXPORTFusionEngineFramer ()=default

Construct a framer instance with no buffer allocated. More...

FusionEngineFramer (const FusionEngineFramer &)=delete
FusionEngineFramer (FusionEngineFramer &&)=delete
P1_EXPORTFusionEngineFramer (size_t capacity_bytes)

Construct a framer instance with an internally allocated buffer. More...

P1_EXPORTFusionEngineFramer (void *buffer, size_t capacity_bytes)

Construct a framer instance with a user-specified buffer. More...

Public Destructor Index

P1_EXPORT~FusionEngineFramer ()

Public Operators Index

FusionEngineFramer &operator= (const FusionEngineFramer &)=delete
FusionEngineFramer &operator= (FusionEngineFramer &&)=delete

Public Member Functions Index

P1_EXPORT size_tOnData (const uint8_t *buffer, size_t length_bytes)

Process incoming data. More...

P1_EXPORT voidReset ()

Reset the framer and discard all pending data. More...

P1_EXPORT voidSetBuffer (void *buffer, size_t capacity_bytes)

Set the buffer to use for message framing. More...

P1_EXPORT voidSetMessageCallback (RawMessageCallback callback, void *context)

Specify a function to be called when a message is framed. More...

P1_EXPORT voidWarnOnError (bool enabled)

Enable/disable warnings for CRC and "message too large" failures. More...

Private Member Functions Index

voidClearManagedBuffer ()

Free the buffer_ if it's being managed internally. More...

int32_tOnByte (bool quiet)

Process a single byte. More...

uint32_tResync ()

Perform a resynchronization operation starting at buffer_[1]. More...

Private Member Attributes Index

uint8_t *buffer_ {nullptr}
uint32_tcapacity_bytes_ {0}
uint32_tcurrent_message_size_ {0}
boolis_buffer_managed_ = false
uint32_tnext_byte_index_ {0}
RawMessageCallbackraw_callback_ = nullptr
void *raw_callback_context_ = nullptr
Statestate_ {State::SYNC0}
boolwarn_on_error_ = true

Description

Frame and validate incoming FusionEngine messages.

This class locates and validates FusionEngine messages within a stream of binary data. Data may be stored in an internally allocated buffer, or in an external buffer supplied by the user.

The callback function provided to SetMessageCallback() will be called each time a complete message is received. Any messages that do not pass the CRC check, or that are too big to be stored in the data buffer, will be discarded.

Example usage:

 void MessageReceived(const MessageHeader& header, const void* payload) {
  if (header.message_type == MessageType::POSE) {
  auto& contents = *static_cast<const PoseMessage*>(payload);
  ...
  }
 }
 
 framer.SetMessageCallback(MessageReceived);

Definition at line 48 of file fusion_engine_framer.h.

Public Member Typedefs

RawMessageCallback

using point_one::fusion_engine::parsers::FusionEngineFramer::RawMessageCallback = void (*)(void* context, const messages::MessageHeader& header, const void* payload)

Definition at line 54 of file fusion_engine_framer.h.

56 const void* payload);

Enumerations

State

enum class point_one::fusion_engine::parsers::FusionEngineFramer::State
strong
Enumeration values
SYNC0 (= 0)
SYNC1 (= 1)
HEADER (= 2)
DATA (= 3)

Definition at line 163 of file fusion_engine_framer.h.

163 enum class State {
164 SYNC0 = 0,
165 SYNC1 = 1,
166 HEADER = 2,
167 DATA = 3,
168 };

Public Constructors

FusionEngineFramer()

P1_EXPORT point_one::fusion_engine::parsers::FusionEngineFramer::FusionEngineFramer ()
default

Construct a framer instance with no buffer allocated.

info

You must call SetBuffer() to assign a buffer, otherwise all incoming data will be discarded.

Definition at line 65 of file fusion_engine_framer.h.

FusionEngineFramer()

point_one::fusion_engine::parsers::FusionEngineFramer::FusionEngineFramer (const FusionEngineFramer &)
delete

Definition at line 90 of file fusion_engine_framer.h.

FusionEngineFramer()

point_one::fusion_engine::parsers::FusionEngineFramer::FusionEngineFramer (FusionEngineFramer &&)
delete

Definition at line 91 of file fusion_engine_framer.h.

FusionEngineFramer()

P1_EXPORT point_one::fusion_engine::parsers::FusionEngineFramer::FusionEngineFramer (size_t capacity_bytes)
inline explicit

Construct a framer instance with an internally allocated buffer.

Parameters
capacity_bytes

The maximum framing buffer capacity (in bytes).

Definition at line 72 of file fusion_engine_framer.h.

FusionEngineFramer()

FusionEngineFramer::FusionEngineFramer (void * buffer, size_t capacity_bytes)

Construct a framer instance with a user-specified buffer.

Postcondition

buffer must exist for the lifetime of this instance.

Parameters
buffer

The framing buffer to use. Set to nullptr to allocate a buffer internally.

capacity_bytes

The maximum framing buffer capacity (in bytes).

Declaration at line 85 of file fusion_engine_framer.h, definition at line 95 of file fusion_engine_framer.cc.

96 // Allocate a buffer internally.
97 if (buffer == nullptr) {
98 // We enforce 4B alignment, so we need to allocate 3 extra bytes to
99 // guarantee that the buffer has at least the requested capacity.
100 SetBuffer(nullptr, capacity_bytes + 3);
101 }
102 // Use user-provided storage.
103 else {
105 }
106}

Public Destructor

~FusionEngineFramer()

FusionEngineFramer::~FusionEngineFramer ()

Public Operators

operator=()

FusionEngineFramer & point_one::fusion_engine::parsers::FusionEngineFramer::operator= (const FusionEngineFramer &)
delete

Definition at line 92 of file fusion_engine_framer.h.

operator=()

FusionEngineFramer & point_one::fusion_engine::parsers::FusionEngineFramer::operator= (FusionEngineFramer &&)
delete

Definition at line 94 of file fusion_engine_framer.h.

Public Member Functions

OnData()

size_t FusionEngineFramer::OnData (const uint8_t * buffer, size_t length_bytes)

Process incoming data.

Parameters
buffer

A buffer containing data to be framed.

length_bytes

The number of bytes to be framed.

Returns

The total size of all valid, complete messages, or 0 if no messages were completed.

Declaration at line 160 of file fusion_engine_framer.h, definition at line 155 of file fusion_engine_framer.cc.

156 // Process each byte. If the user-supplied buffer was too small, we can't
157 // parse messages.
158 if (buffer_ != nullptr) {
159 VLOG(2) << "Received " << length_bytes << " bytes.";
160 size_t total_dispatched_bytes = 0;
161 for (size_t idx = 0; idx < length_bytes; ++idx) {
162 uint8_t byte = buffer[idx];
165 if (dispatched_message_size == 0) {
166 // Waiting for more data. Nothing to do.
167 } else if (dispatched_message_size > 0) {
168 // Message framed successfully. Reset for the next one.
171 } else if (next_byte_index_ > 0) {
172 // If OnByte() indicated an error (size < 0) and there is still data in
173 // the buffer, either the CRC failed or the payload was too big to fit
174 // in the buffer.
175 //
176 // In either case, it is possible we found what looked like the preamble
177 // somewhere within the data stream but it wasn't actually a valid
178 // message. In that case, the data we processed may contain the start of
179 // a valid message, or even one or more complete messages, starting
180 // somewhere after byte 0 Perform a resync operation to find valid
181 // messages.
183 } else {
184 // OnByte() caught an unrecoverable error and reset the buffer. Nothing
185 // to do.
186 }
187 }
189 } else {
190 return 0;
191 }
192}

Reset()

void FusionEngineFramer::Reset ()

Reset the framer and discard all pending data.

Declaration at line 149 of file fusion_engine_framer.h, definition at line 148 of file fusion_engine_framer.cc.

SetBuffer()

void FusionEngineFramer::SetBuffer (void * buffer, size_t capacity_bytes)

Set the buffer to use for message framing.

Postcondition

buffer must exist for the lifetime of this instance.

Parameters
buffer

The framing buffer to use. Set to nullptr to allocate a buffer internally.

capacity_bytes

The maximum framing buffer capacity (in bytes).

Declaration at line 107 of file fusion_engine_framer.h, definition at line 112 of file fusion_engine_framer.cc.

113 if (capacity_bytes < sizeof(MessageHeader)) {
114 LOG(ERROR) << "FusionEngine framing buffer too small. [capacity="
115 << capacity_bytes << " B, min=" << sizeof(MessageHeader)
116 << " B]";
117 return;
118 }
119 // Restrict the buffer capacity to 2^31 bytes. We don't expect to ever have a
120 // single message anywhere near that large, and don't expect users to ever
121 // pass in a buffer that size. Using uint32_t instead of size_t internally
122 // makes it easier to guarantee behavior between 64b and 32b architectures. We
123 // do 2^31, not 2^32, so we can use int32_t for return values internally.
124 else if (capacity_bytes > 0x7FFFFFFF) {
125 LOG(WARNING) << "Limiting buffer capacity to 2^31 B. [original_capacity="
126 << capacity_bytes << " B]";
127 capacity_bytes = 0x7FFFFFFF;
128 }
129
131 if (buffer == nullptr) {
133 is_buffer_managed_ = true;
134 }
135
136 // Enforce 4B alignment at the beginning of the buffer.
137 uint8_t* buffer_unaligned = static_cast<uint8_t*>(buffer);
138 buffer_ = reinterpret_cast<uint8_t*>(
139 (reinterpret_cast<size_t>(buffer_unaligned) + 3) &
140 ~(static_cast<size_t>(3)));
143
144 Reset();
145}

SetMessageCallback()

P1_EXPORT void point_one::fusion_engine::parsers::FusionEngineFramer::SetMessageCallback (RawMessageCallback callback, void * context)
inline

Specify a function to be called when a message is framed.

Parameters
callback

The function to be called with the supplied context variable, the message header, and a pointer to the message payload.

context

A context value that will be passed to the callback.

Definition at line 140 of file fusion_engine_framer.h.

WarnOnError()

P1_EXPORT void point_one::fusion_engine::parsers::FusionEngineFramer::WarnOnError (bool enabled)
inline

Enable/disable warnings for CRC and "message too large" failures.

This is typically used when the incoming stream has multiple types of binary content (e.g., interleaved FusionEngine and RTCM messages), and the FusionEngine message preamble is expected to appear in the non-FusionEngine content occasionally.

Parameters
enabled

If true, issue warnings on errors.

Definition at line 119 of file fusion_engine_framer.h.

Private Member Functions

ClearManagedBuffer()

void FusionEngineFramer::ClearManagedBuffer ()

Free the buffer_ if it's being managed internally.

Declaration at line 209 of file fusion_engine_framer.h, definition at line 515 of file fusion_engine_framer.cc.

516 if (is_buffer_managed_ && buffer_ != nullptr) {
517 delete[] buffer_;
518 is_buffer_managed_ = false;
519 buffer_ = nullptr;
520 }
521}

OnByte()

int32_t FusionEngineFramer::OnByte (bool quiet)

Process a single byte.

Precondition

The byte must be located at buffer_[next_byte_index_ - 1].

Parameters
quiet

If true, suppress failure warning messages.

Returns

The total size of all valid, complete messages, 0 if no messages were completed, or <0 CRC or "message too large" error.

Declaration at line 196 of file fusion_engine_framer.h, definition at line 195 of file fusion_engine_framer.cc.

196 // User-supplied buffer was too small. Can't parse messages.
197 if (buffer_ == nullptr) {
198 return 0;
199 }
200
201 // If warnings are disabled, run in quiet mode.
202 if (!warn_on_error_) {
203 quiet = true;
204 }
205
206 // Pull out the byte being processed.
207 if (next_byte_index_ == 0) {
208 LOG(ERROR) << "Byte not found in buffer.";
209 return 0;
210 }
211
213
214 // Look for the first sync byte.
215 //
216 // Note that we always put the first byte at offset 0 in the buffer, and the
217 // buffer is guaranteed to be 4B-aligned by the constructor, so the framed
218 // message will always be 4B aligned.
219 bool crc_check_needed = false;
220 if (state_ == State::SYNC0) {
221 VLOG(4) << "Searching for sync byte 0 [byte=" << HexPrintableInteger(byte)
222 << "]";
223 if (byte == MessageHeader::SYNC0) {
224 VLOG(4) << "Found sync byte 0.";
226 } else {
228 }
229 }
230 // Look for the second sync byte.
231 else if (state_ == State::SYNC1) {
232 VLOG(4) << "Searching for sync byte 1 [byte=" << HexPrintableInteger(byte)
233 << "]";
234 if (byte == MessageHeader::SYNC0) {
235 VLOG(4) << "Found duplicate sync byte 0.";
238 } else if (byte == MessageHeader::SYNC1) {
239 VLOG(3) << "Preamble found. Waiting for header.";
241 } else {
242 VLOG(4) << "Did not find sync byte 1 Resetting. [byte="
243 << HexPrintableInteger(byte) << "]";
247 }
248 }
249 // Search for a message header.
250 else if (state_ == State::HEADER) {
251 VLOG(4) << "Received " << next_byte_index_ << "/" << sizeof(MessageHeader)
252 << " header bytes. [byte=" << HexPrintableInteger(byte) << "]";
253
254 // Check if the header is complete.
255 if (next_byte_index_ == sizeof(MessageHeader)) {
256 auto* header = reinterpret_cast<MessageHeader*>(buffer_);
258 sizeof(MessageHeader) + header->payload_size_bytes;
259 VLOG(3) << "Header complete. Waiting for payload. [message="
260 << header->message_type << ", seq=" << header->sequence_number
261 << ", payload_size=" << header->payload_size_bytes
262 << " B, message_size=" << current_message_size_ << " B]";
263
264 // Check for overflow of the messge size uint32_t variable. We don't
265 // currently expect to have _extremely_ large packets, so this should
266 // never happen for a valid message. If it does, assume this is not a
267 // valid message, and instead the sync pattern likely showed up at random
268 // in the data stream.
269 if (current_message_size_ < header->payload_size_bytes) {
270 if (quiet) {
271 VLOG(2) << "Message size overflow. Dropping suspected invalid sync. "
272 "[size="
274 << " B (payload=" << header->payload_size_bytes << " B)]";
275 } else {
277 << "Message size overflow. Dropping suspected invalid sync. "
278 "[size="
280 << " B (payload=" << header->payload_size_bytes << " B)]";
281 }
282
284 return -1;
285 }
286 // The reserved bytes in the header are currently always set to 0 If the
287 // incoming bytes are not zero, assume this an invalid sync.
288 //
289 // This may change in the future, but for now it prevents us from
290 // needing to collect a ton of bytes before performing a CRC check if a
291 // bogus header from an invalid sync has a very large payload size that
292 // happens to still be smaller than the buffer size.
293 else if (header->reserved[0] != 0 || header->reserved[1] != 0) {
294 if (quiet) {
295 VLOG(2) << "Reserved bytes nonzero. Dropping suspected invalid sync. "
296 "[size="
298 << " B (payload=" << header->payload_size_bytes << " B)]";
299 } else {
300 LOG(WARNING) << "Reserved bytes nonzero. Dropping suspected invalid "
301 "sync. [size="
303 << " B (payload=" << header->payload_size_bytes
304 << " B)]";
305 }
306
308 return -1;
309 }
310 // If the message is too large to fit in the buffer, we cannot parse it.
311 //
312 // If this is an invalid sync, the parsed (invalid) payload length may
313 // exceed the buffer size and the invalid header will be dropped. If it
314 // does not exceed the buffer size, it'll get caught later during the CRC
315 // check.
317 if (quiet) {
318 VLOG(2) << "Message too large for buffer. [size="
320 << " B (payload=" << header->payload_size_bytes
321 << " B), buffer_capacity=" << capacity_bytes_
322 << " B (max_payload="
323 << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
324 } else {
325 LOG(WARNING) << "Message too large for buffer. [size="
327 << " B (payload=" << header->payload_size_bytes
328 << " B), buffer_capacity=" << capacity_bytes_
329 << " B (max_payload="
330 << capacity_bytes_ - sizeof(MessageHeader) << " B)]";
331 }
332
334 return -1;
335 }
336 // Sanity checks passed. Start collecting the message payload next.
337 else {
338 // If there's no payload, do the CRC check now.
339 if (header->payload_size_bytes == 0) {
340 VLOG(3) << "Message has no payload. Checking CRC.";
341 crc_check_needed = true;
342 }
343 // Otherwise, collect the payload, then do the CRC check.
344 else {
346 }
347 }
348 }
349 }
350 // Collect the message payload.
351 else if (state_ == State::DATA) {
352 VLOG(4) << "Received " << next_byte_index_ << "/" << current_message_size_
353 << " message bytes (" << next_byte_index_ - sizeof(MessageHeader)
354 << "/" << current_message_size_ - sizeof(MessageHeader)
355 << " payload bytes). [byte=" << HexPrintableInteger(byte) << "]";
356
357 // If we received the full payload, check the CRC and dispatch it.
359 VLOG(3) << "Payload complete. Checking CRC.";
360 crc_check_needed = true;
361 }
362 }
363 // Illegal state.
364 else {
365 LOG(ERROR) << "Impossible parsing state.";
366 Reset();
367 return -1;
368 }
369
370 // Payload complete (or message has no payload). Check the CRC.
371 if (crc_check_needed) {
373 auto* header = reinterpret_cast<MessageHeader*>(buffer_);
374 if (crc == header->crc) {
375 VLOG(1) << "CRC passed. Dispatching message. [message="
376 << header->message_type << " (" << (unsigned)header->message_type
377 << "), seq=" << header->sequence_number
378 << ", size=" << current_message_size_
379 << " B, crc=" << HexPrintableInteger(crc) << "]";
380 auto* payload = reinterpret_cast<uint8_t*>(header + 1);
381#if P1_HAVE_STD_FUNCTION
382 if (callback_) {
383 callback_(*header, payload);
384 }
385#endif // P1_HAVE_STD_FUNCTION
386 if (raw_callback_) {
388 }
390 return static_cast<int32_t>(current_message_size_);
391 } else {
392 if (quiet) {
393 VLOG(2) << "CRC check failed. [message=" << header->message_type << " ("
394 << (unsigned)header->message_type
395 << "), seq=" << header->sequence_number
396 << ", size=" << current_message_size_
397 << " B, crc=" << HexPrintableInteger(crc)
398 << ", expected_crc=" << HexPrintableInteger(header->crc) << "]";
399 } else {
400 LOG(WARNING) << "CRC check failed. [message=" << header->message_type
401 << " (" << (unsigned)header->message_type
402 << "), seq=" << header->sequence_number
403 << ", size=" << current_message_size_
404 << " B, crc=" << HexPrintableInteger(crc)
405 << ", expected_crc=" << HexPrintableInteger(header->crc)
406 << "]";
407 }
409 return -1;
410 }
411 }
412
413 // No messages completed.
414 return 0;
415}

Resync()

uint32_t FusionEngineFramer::Resync ()

Perform a resynchronization operation starting at buffer_[1].

Returns

The total size of all valid, complete messages, or 0 if no messages were completed.

Declaration at line 204 of file fusion_engine_framer.h, definition at line 418 of file fusion_engine_framer.cc.

419 // If the message preamble shows up randomly somewhere in the data stream, we
420 // may sync to it and try to parse a message starting at that arbitrary
421 // location. We will eventually detect the bad sync either by CRC failure or
422 // because the payload size we extract from the bogus message header is too
423 // large.
424 //
425 // In either case, the bytes we collected - both the bogus header and the
426 // payload bytes based on the size in the header - may contain the start of a
427 // valid message, or even one or more complete messages. We need to resync and
428 // try to find those messages. Any of the following scenarios is possible:
429 // ...validvalidval <-- Contiguous valid messages
430 // ...valid...valid...val <-- Multiple valid messages, separated by invalid
431 // ...valid...valid... <-- Similar, but ending with invalid
432 // ...val <-- Start of a valid message, no complete messages
433 // ... <-- No valid content
434 //
435 // Ideally, we would search for these messages in-place and avoid shifting the
436 // data around in the buffer. However, since we need the messages to be
437 // 4B-aligned and there's only a 1-in-4 chance of that happening naturally,
438 // we'd have to shift the data 75% of the time regardless.
439 //
440 // Given that, we simply shift all data left in the buffer and process one
441 // message at a time. This is not as efficient, but it's the simplest option.
443 VLOG(1) << "Attempting resynchronization. [" << available_bytes - 1
444 << " candidate bytes]";
450
451 // Skip forward until we see a SYNC0.
452 if (state_ == State::SYNC0) {
454 VLOG(1) << "Candidate message start found @ offset " << offset << "/"
455 << available_bytes << ".";
456 // Shift all of the data left in the buffer.
458 std::memmove(buffer_, buffer_ + offset, available_bytes);
459 offset = 0;
460 } else {
461 VLOG(4) << "Skipping non-sync byte 0 @ offset " << offset << "/"
463 << ". [byte=" << HexPrintableInteger(current_byte) << "]";
464 continue;
465 }
466 }
467
468 // Process this byte. If we end up back in the SYNC0 state, either A) the
469 // SYNC0 we found was a valid message and got dispatched, or B) was not the
470 // start of a valid message.
471 //
472 // In (A), message_size > 0 indicating there was a valid message, so we know
473 // we can just keep going with the rest of the data in the buffer.
474 //
475 // In (B), message_size == 0 In that case we'll rewind back to the byte
476 // just after we located the SYNC0 and see if there's another one.
477 //
478 // Note that next_byte_index_ always points to the next open slot, i.e.,
479 // one byte _after_ the current byte.
482
483 if (state_ == State::SYNC0) {
484 // Note that offset will be incremented when we loop around, so we set it
485 // to N-1, where N is wherever we want to start searching next.
486 if (message_size > 0) {
488 offset = message_size - 1;
489 VLOG(1)
490 << "Resync found a complete message. Continuing search @ offset "
491 << offset + 1 << "/" << available_bytes
492 << ". [message_size=" << message_size << ", "
494 << " candidate bytes remaining]";
495 } else {
496 size_t prev_offset = offset;
497 offset = 0;
498 VLOG(1) << "Candidate message rejected after " << prev_offset
499 << " bytes. Restarting search @ offset " << offset + 1 << "/"
500 << available_bytes << ". [" << available_bytes - 1
501 << " candidate bytes remaining]";
502 }
503
505 }
506 }
507
508 VLOG(1) << "Resynchronization finished. " << next_byte_index_
509 << " bytes remaining in buffer.";
510
511 return total_message_size;
512}

Private Member Attributes

buffer_

uint8_t* point_one::fusion_engine::parsers::FusionEngineFramer::buffer_ {nullptr}

Definition at line 178 of file fusion_engine_framer.h.

178 uint8_t* buffer_{nullptr};

capacity_bytes_

uint32_t point_one::fusion_engine::parsers::FusionEngineFramer::capacity_bytes_ {0}

Definition at line 179 of file fusion_engine_framer.h.

current_message_size_

uint32_t point_one::fusion_engine::parsers::FusionEngineFramer::current_message_size_ {0}

Definition at line 183 of file fusion_engine_framer.h.

is_buffer_managed_

bool point_one::fusion_engine::parsers::FusionEngineFramer::is_buffer_managed_ = false

Definition at line 177 of file fusion_engine_framer.h.

177 bool is_buffer_managed_ = false;

next_byte_index_

uint32_t point_one::fusion_engine::parsers::FusionEngineFramer::next_byte_index_ {0}

Definition at line 182 of file fusion_engine_framer.h.

raw_callback_

RawMessageCallback point_one::fusion_engine::parsers::FusionEngineFramer::raw_callback_ = nullptr

Definition at line 173 of file fusion_engine_framer.h.

raw_callback_context_

void* point_one::fusion_engine::parsers::FusionEngineFramer::raw_callback_context_ = nullptr

Definition at line 174 of file fusion_engine_framer.h.

174 void* raw_callback_context_ = nullptr;

state_

State point_one::fusion_engine::parsers::FusionEngineFramer::state_ {State::SYNC0}

Definition at line 181 of file fusion_engine_framer.h.

warn_on_error_

bool point_one::fusion_engine::parsers::FusionEngineFramer::warn_on_error_ = true

Definition at line 176 of file fusion_engine_framer.h.

176 bool warn_on_error_ = true;

The documentation for this class was generated from the following files:


Generated via doxygen2docusaurus 2.0.0 by Doxygen 1.9.8.