6 #define P1_VMODULE_NAME fusion_engine_framer
29 stream <<
"0x" << std::hex << std::setfill(
'0') << std::setw(2)
30 << (unsigned)obj.
byte_ << std::dec;
32 stream <<
" ('" << (char)obj.
byte_ <<
"')";
41 FusionEngineFramer::FusionEngineFramer(
void* buffer,
size_t capacity_bytes) {
43 if (buffer ==
nullptr) {
46 SetBuffer(
nullptr, capacity_bytes + 3);
50 SetBuffer(buffer, capacity_bytes);
55 void FusionEngineFramer::SetBuffer(
void* buffer,
size_t capacity_bytes) {
57 LOG(ERROR) <<
"FusionEngine framing buffer too small. [capacity="
67 else if (capacity_bytes > 0x7FFFFFFF) {
68 LOG(WARNING) <<
"Limiting buffer capacity to 2^31 B. [original_capacity="
69 << capacity_bytes <<
" B]";
70 capacity_bytes = 0x7FFFFFFF;
73 if (buffer ==
nullptr) {
74 managed_buffer_.reset(
new uint8_t[capacity_bytes]);
75 buffer = managed_buffer_.get();
76 }
else if (buffer != managed_buffer_.get()) {
77 managed_buffer_.reset(
nullptr);
81 uint8_t* buffer_unaligned =
static_cast<uint8_t*
>(buffer);
82 buffer_ =
reinterpret_cast<uint8_t*
>(
83 (
reinterpret_cast<size_t>(buffer_unaligned) + 3) &
84 ~(
static_cast<size_t>(3)));
86 static_cast<uint32_t
>(capacity_bytes - (buffer_ - buffer_unaligned));
92 void FusionEngineFramer::Reset() {
93 state_ = State::SYNC0;
95 current_message_size_ = 0;
99 size_t FusionEngineFramer::OnData(
const uint8_t* buffer,
size_t length_bytes) {
102 if (buffer_ !=
nullptr) {
103 VLOG(2) <<
"Received " << length_bytes <<
" bytes.";
104 size_t total_dispatched_bytes = 0;
105 for (
size_t idx = 0; idx < length_bytes; ++idx) {
106 uint8_t
byte = buffer[idx];
107 buffer_[next_byte_index_++] = byte;
108 int32_t dispatched_message_size = OnByte(
false);
109 if (dispatched_message_size == 0) {
111 }
else if (dispatched_message_size > 0) {
113 next_byte_index_ = 0;
114 total_dispatched_bytes += (size_t)dispatched_message_size;
115 }
else if (next_byte_index_ > 0) {
126 total_dispatched_bytes += Resync();
132 return total_dispatched_bytes;
139 int32_t FusionEngineFramer::OnByte(
bool quiet) {
141 if (buffer_ ==
nullptr) {
146 if (!warn_on_error_) {
151 if (next_byte_index_ == 0) {
152 LOG(ERROR) <<
"Byte not found in buffer.";
156 uint8_t
byte = buffer_[next_byte_index_ - 1];
163 bool crc_check_needed =
false;
164 if (state_ == State::SYNC0) {
168 VLOG(4) <<
"Found sync byte 0.";
169 state_ = State::SYNC1;
175 else if (state_ == State::SYNC1) {
179 VLOG(4) <<
"Found duplicate sync byte 0.";
180 state_ = State::SYNC1;
183 VLOG(3) <<
"Preamble found. Waiting for header.";
184 state_ = State::HEADER;
186 VLOG(4) <<
"Did not find sync byte 1. Resetting. [byte="
188 state_ = State::SYNC0;
189 next_byte_index_ = 0;
190 current_message_size_ = 0;
194 else if (state_ == State::HEADER) {
213 current_message_size_ =
215 VLOG(3) <<
"Header complete. Waiting for payload. [message="
216 << header->message_type <<
" (" << (unsigned)header->message_type
217 <<
"), seq=" << header->sequence_number
218 <<
", payload_size=" << header->payload_size_bytes <<
" B]";
219 if (header->payload_size_bytes <=
222 if (header->payload_size_bytes == 0) {
223 VLOG(3) <<
"Message has no payload. Checking CRC.";
224 crc_check_needed =
true;
228 state_ = State::DATA;
232 VLOG(2) <<
"Message too large for buffer. [size="
233 << current_message_size_
234 <<
" B (payload=" << header->payload_size_bytes
235 <<
" B), buffer_capacity=" << capacity_bytes_
236 <<
" B (max_payload="
239 LOG(WARNING) <<
"Message too large for buffer. [size="
240 << current_message_size_
241 <<
" B (payload=" << header->payload_size_bytes
242 <<
" B), buffer_capacity=" << capacity_bytes_
243 <<
" B (max_payload="
247 state_ = State::SYNC0;
253 else if (state_ == State::DATA) {
254 VLOG(4) <<
"Received " << next_byte_index_ <<
"/" << current_message_size_
255 <<
" message bytes (" << next_byte_index_ -
sizeof(
MessageHeader)
260 if (next_byte_index_ == current_message_size_) {
261 VLOG(3) <<
"Payload complete. Checking CRC.";
262 crc_check_needed =
true;
267 LOG(ERROR) <<
"Impossible parsing state.";
273 if (crc_check_needed) {
276 if (crc == header->crc) {
277 VLOG(1) <<
"CRC passed. Dispatching message. [message="
278 << header->message_type <<
" (" << (unsigned)header->message_type
279 <<
"), seq=" << header->sequence_number
280 <<
", size=" << current_message_size_ <<
" B, crc=0x" << std::hex
281 << std::setfill(
'0') << std::setw(8) << crc <<
"]";
283 auto* payload =
reinterpret_cast<uint8_t*
>(header + 1);
284 callback_(*header, payload);
286 state_ = State::SYNC0;
287 return static_cast<int32_t
>(current_message_size_);
290 VLOG(2) <<
"CRC check failed. [message=" << header->message_type <<
" ("
291 << (unsigned)header->message_type
292 <<
"), seq=" << header->sequence_number
293 <<
", size=" << current_message_size_ <<
" B, crc=0x"
294 << std::hex << std::setfill(
'0') << std::setw(8) << crc
295 <<
", expected_crc=0x" << std::setw(8) << header->crc <<
"]";
297 LOG(WARNING) <<
"CRC check failed. [message=" << header->message_type
298 <<
" (" << (unsigned)header->message_type
299 <<
"), seq=" << header->sequence_number
300 <<
", size=" << current_message_size_ <<
" B, crc=0x"
301 << std::hex << std::setfill(
'0') << std::setw(8) << crc
302 <<
", expected_crc=0x" << std::setw(8) << header->crc
305 state_ = State::SYNC0;
315 uint32_t FusionEngineFramer::Resync() {
339 uint32_t available_bytes = next_byte_index_;
340 VLOG(1) <<
"Attempting resynchronization. [" << available_bytes - 1
341 <<
" candidate bytes]";
342 uint32_t total_message_size = 0;
343 state_ = State::SYNC0;
344 next_byte_index_ = 0;
345 for (uint32_t offset = 1; offset < available_bytes; ++offset) {
346 uint8_t current_byte = buffer_[offset];
349 if (state_ == State::SYNC0) {
351 VLOG(1) <<
"Candidate message start found @ offset " << offset <<
"/"
352 << available_bytes <<
".";
354 available_bytes -= offset;
355 std::memmove(buffer_, buffer_ + offset, available_bytes);
358 VLOG(4) <<
"Skipping non-sync byte 0 @ offset " << offset <<
"/"
359 << available_bytes <<
". [byte=" <<
PrintableByte(current_byte)
377 next_byte_index_ = offset + 1;
378 int32_t message_size = OnByte(
true);
380 if (state_ == State::SYNC0) {
383 if (message_size > 0) {
384 total_message_size += message_size;
385 offset = message_size - 1;
387 <<
"Resync found a complete message. Continuing search @ offset "
388 << offset + 1 <<
"/" << available_bytes
389 <<
". [message_size=" << message_size <<
", "
390 << (available_bytes - message_size - 1)
391 <<
" candidate bytes remaining]";
393 size_t prev_offset = offset;
395 VLOG(1) <<
"Candidate message rejected after " << prev_offset
396 <<
" bytes. Restarting search @ offset " << offset + 1 <<
"/"
397 << available_bytes <<
". [" << available_bytes - 1
398 <<
" candidate bytes remaining]";
401 next_byte_index_ = 0;
405 VLOG(1) <<
"Resynchronization finished. " << next_byte_index_
406 <<
" bytes remaining in buffer.";
408 return total_message_size;