6 #define P1_VMODULE_NAME fusion_engine_framer
11 #if P1_HAVE_STD_OSTREAM
14 # include <type_traits>
40 #if P1_HAVE_STD_OSTREAM
41 static_assert(std::is_integral<T>::value,
"Integer required.");
43 stream <<
"0x" << std::hex << std::setfill(
'0') << std::setw(
sizeof(obj) * 2);
46 stream << (((unsigned)obj.
value_) & 0xFF);
53 if (
sizeof(obj) == 1) {
55 stream <<
" ('" << (char)obj.
value_ <<
"')";
95 FusionEngineFramer::FusionEngineFramer(
void* buffer,
size_t capacity_bytes) {
97 if (buffer ==
nullptr) {
100 SetBuffer(
nullptr, capacity_bytes + 3);
104 SetBuffer(buffer, capacity_bytes);
109 FusionEngineFramer::~FusionEngineFramer() { ClearManagedBuffer(); }
112 void FusionEngineFramer::SetBuffer(
void* buffer,
size_t capacity_bytes) {
114 LOG(ERROR) <<
"FusionEngine framing buffer too small. [capacity="
124 else if (capacity_bytes > 0x7FFFFFFF) {
125 LOG(WARNING) <<
"Limiting buffer capacity to 2^31 B. [original_capacity="
126 << capacity_bytes <<
" B]";
127 capacity_bytes = 0x7FFFFFFF;
130 ClearManagedBuffer();
131 if (buffer ==
nullptr) {
132 buffer =
new uint8_t[capacity_bytes];
133 is_buffer_managed_ =
true;
137 uint8_t* buffer_unaligned =
static_cast<uint8_t*
>(buffer);
138 buffer_ =
reinterpret_cast<uint8_t*
>(
139 (
reinterpret_cast<size_t>(buffer_unaligned) + 3) &
140 ~(
static_cast<size_t>(3)));
142 static_cast<uint32_t
>(capacity_bytes - (buffer_ - buffer_unaligned));
148 void FusionEngineFramer::Reset() {
149 state_ = State::SYNC0;
150 next_byte_index_ = 0;
151 current_message_size_ = 0;
155 size_t FusionEngineFramer::OnData(
const uint8_t* buffer,
size_t length_bytes) {
158 if (buffer_ !=
nullptr) {
159 VLOG(2) <<
"Received " << length_bytes <<
" bytes.";
160 size_t total_dispatched_bytes = 0;
161 for (
size_t idx = 0; idx < length_bytes; ++idx) {
162 uint8_t
byte = buffer[idx];
163 buffer_[next_byte_index_++] = byte;
164 int32_t dispatched_message_size = OnByte(
false);
165 if (dispatched_message_size == 0) {
167 }
else if (dispatched_message_size > 0) {
169 next_byte_index_ = 0;
170 total_dispatched_bytes += (size_t)dispatched_message_size;
171 }
else if (next_byte_index_ > 0) {
182 total_dispatched_bytes += Resync();
188 return total_dispatched_bytes;
195 int32_t FusionEngineFramer::OnByte(
bool quiet) {
197 if (buffer_ ==
nullptr) {
202 if (!warn_on_error_) {
207 if (next_byte_index_ == 0) {
208 LOG(ERROR) <<
"Byte not found in buffer.";
212 uint8_t
byte = buffer_[next_byte_index_ - 1];
219 bool crc_check_needed =
false;
220 if (state_ == State::SYNC0) {
224 VLOG(4) <<
"Found sync byte 0.";
225 state_ = State::SYNC1;
231 else if (state_ == State::SYNC1) {
235 VLOG(4) <<
"Found duplicate sync byte 0.";
236 state_ = State::SYNC1;
239 VLOG(3) <<
"Preamble found. Waiting for header.";
240 state_ = State::HEADER;
242 VLOG(4) <<
"Did not find sync byte 1. Resetting. [byte="
244 state_ = State::SYNC0;
245 next_byte_index_ = 0;
246 current_message_size_ = 0;
250 else if (state_ == State::HEADER) {
257 current_message_size_ =
259 VLOG(3) <<
"Header complete. Waiting for payload. [message="
260 << header->message_type <<
", seq=" << header->sequence_number
261 <<
", payload_size=" << header->payload_size_bytes
262 <<
" B, message_size=" << current_message_size_ <<
" B]";
269 if (current_message_size_ < header->payload_size_bytes) {
271 VLOG(2) <<
"Message size overflow. Dropping suspected invalid sync. "
273 << current_message_size_
274 <<
" B (payload=" << header->payload_size_bytes <<
" B)]";
277 <<
"Message size overflow. Dropping suspected invalid sync. "
279 << current_message_size_
280 <<
" B (payload=" << header->payload_size_bytes <<
" B)]";
283 state_ = State::SYNC0;
293 else if (header->reserved[0] != 0 || header->reserved[1] != 0) {
295 VLOG(2) <<
"Reserved bytes nonzero. Dropping suspected invalid sync. "
297 << current_message_size_
298 <<
" B (payload=" << header->payload_size_bytes <<
" B)]";
300 LOG(WARNING) <<
"Reserved bytes nonzero. Dropping suspected invalid "
302 << current_message_size_
303 <<
" B (payload=" << header->payload_size_bytes
307 state_ = State::SYNC0;
316 else if (current_message_size_ > capacity_bytes_) {
318 VLOG(2) <<
"Message too large for buffer. [size="
319 << current_message_size_
320 <<
" B (payload=" << header->payload_size_bytes
321 <<
" B), buffer_capacity=" << capacity_bytes_
322 <<
" B (max_payload="
325 LOG(WARNING) <<
"Message too large for buffer. [size="
326 << current_message_size_
327 <<
" B (payload=" << header->payload_size_bytes
328 <<
" B), buffer_capacity=" << capacity_bytes_
329 <<
" B (max_payload="
333 state_ = State::SYNC0;
339 if (header->payload_size_bytes == 0) {
340 VLOG(3) <<
"Message has no payload. Checking CRC.";
341 crc_check_needed =
true;
345 state_ = State::DATA;
351 else if (state_ == State::DATA) {
352 VLOG(4) <<
"Received " << next_byte_index_ <<
"/" << current_message_size_
353 <<
" message bytes (" << next_byte_index_ -
sizeof(
MessageHeader)
358 if (next_byte_index_ == current_message_size_) {
359 VLOG(3) <<
"Payload complete. Checking CRC.";
360 crc_check_needed =
true;
365 LOG(ERROR) <<
"Impossible parsing state.";
371 if (crc_check_needed) {
374 if (crc == header->crc) {
375 VLOG(1) <<
"CRC passed. Dispatching message. [message="
376 << header->message_type <<
" (" << (unsigned)header->message_type
377 <<
"), seq=" << header->sequence_number
378 <<
", size=" << current_message_size_
380 auto* payload =
reinterpret_cast<uint8_t*
>(header + 1);
381 #if P1_HAVE_STD_FUNCTION
383 callback_(*header, payload);
385 #endif // P1_HAVE_STD_FUNCTION
387 raw_callback_(raw_callback_context_, *header, payload);
389 state_ = State::SYNC0;
390 return static_cast<int32_t
>(current_message_size_);
393 VLOG(2) <<
"CRC check failed. [message=" << header->message_type <<
" ("
394 << (unsigned)header->message_type
395 <<
"), seq=" << header->sequence_number
396 <<
", size=" << current_message_size_
400 LOG(WARNING) <<
"CRC check failed. [message=" << header->message_type
401 <<
" (" << (unsigned)header->message_type
402 <<
"), seq=" << header->sequence_number
403 <<
", size=" << current_message_size_
408 state_ = State::SYNC0;
418 uint32_t FusionEngineFramer::Resync() {
442 uint32_t available_bytes = next_byte_index_;
443 VLOG(1) <<
"Attempting resynchronization. [" << available_bytes - 1
444 <<
" candidate bytes]";
445 uint32_t total_message_size = 0;
446 state_ = State::SYNC0;
447 next_byte_index_ = 0;
448 for (uint32_t offset = 1; offset < available_bytes; ++offset) {
449 uint8_t current_byte = buffer_[offset];
452 if (state_ == State::SYNC0) {
454 VLOG(1) <<
"Candidate message start found @ offset " << offset <<
"/"
455 << available_bytes <<
".";
457 available_bytes -= offset;
458 std::memmove(buffer_, buffer_ + offset, available_bytes);
461 VLOG(4) <<
"Skipping non-sync byte 0 @ offset " << offset <<
"/"
480 next_byte_index_ = offset + 1;
481 int32_t message_size = OnByte(
true);
483 if (state_ == State::SYNC0) {
486 if (message_size > 0) {
487 total_message_size += message_size;
488 offset = message_size - 1;
490 <<
"Resync found a complete message. Continuing search @ offset "
491 << offset + 1 <<
"/" << available_bytes
492 <<
". [message_size=" << message_size <<
", "
493 << (available_bytes - message_size - 1)
494 <<
" candidate bytes remaining]";
496 size_t prev_offset = offset;
498 VLOG(1) <<
"Candidate message rejected after " << prev_offset
499 <<
" bytes. Restarting search @ offset " << offset + 1 <<
"/"
500 << available_bytes <<
". [" << available_bytes - 1
501 <<
" candidate bytes remaining]";
504 next_byte_index_ = 0;
508 VLOG(1) <<
"Resynchronization finished. " << next_byte_index_
509 <<
" bytes remaining in buffer.";
511 return total_message_size;
515 void FusionEngineFramer::ClearManagedBuffer() {
516 if (is_buffer_managed_ && buffer_ !=
nullptr) {
518 is_buffer_managed_ =
false;