#include "mqtt_parser.h" #include #include #include struct BufferView { const uint8_t* data; size_t size; }; struct ConnectAck { uint8_t packet_type; uint8_t message_length; uint8_t connack_flags; uint8_t return_code; } __attribute__((packed)); struct SubscribeAck { uint8_t packet_type; uint8_t message_length; uint16_t packet_identifier; uint8_t return_code; } __attribute__((packed)); static_assert(sizeof(struct ConnectAck) == 4, "Unexpected connect ack size"); static_assert(sizeof(struct SubscribeAck) == 5, "Unexpected subscribe ack size"); static enum MqttParserResult ParseConnectAck( const void* buffer, size_t buffer_size, const struct MqttParserCallbacks* callbacks, void* user) { if (buffer_size < sizeof(struct ConnectAck)) return kMqttParserResultWantMore; const struct ConnectAck* connect_ack = buffer; if (connect_ack->message_length != 2) return kMqttParserResultError; callbacks->on_connect_ack(user, !connect_ack->return_code); callbacks->on_finished(user, sizeof(struct ConnectAck)); return kMqttParserResultFinished; } static enum MqttParserResult ReadVarint(struct BufferView* buffer_view, size_t* result) { *result = 0; for (size_t counter = 0; counter < 4; counter++) { if (!buffer_view->size) return kMqttParserResultWantMore; uint8_t byte = *buffer_view->data++; buffer_view->size--; *result |= (byte & 0x7full) << (7ull * counter); if (~byte & 0x80) return kMqttParserResultFinished; } return kMqttParserResultError; } static enum MqttParserResult ParsePublish( const void* buffer, size_t buffer_size, const struct MqttParserCallbacks* callbacks, void* user) { size_t message_length; const uint8_t* data = buffer; struct BufferView publish = {.data = data + 1, .size = buffer_size - 1}; enum MqttParserResult result = ReadVarint(&publish, &message_length); if (result != kMqttParserResultFinished) return result; if (publish.size < message_length) return kMqttParserResultWantMore; publish.size = message_length; uint16_t topic_size = (publish.data[0] << 8 | publish.data[1]) & 0xffff; const char* topic = (const char*)&publish.data[2]; const uint8_t* payload = publish.data + 2 + topic_size; size_t payload_size = publish.size - 2 - topic_size; callbacks->on_publish(user, topic, topic_size, payload, payload_size); callbacks->on_finished(user, (size_t)(publish.data + publish.size - data)); return kMqttParserResultFinished; } static enum MqttParserResult ParseSubscribeAck( const void* buffer, size_t buffer_size, const struct MqttParserCallbacks* callbacks, void* user) { if (buffer_size < sizeof(struct SubscribeAck)) return kMqttParserResultWantMore; const struct SubscribeAck* subscribe_ack = buffer; if (subscribe_ack->message_length != 3) return kMqttParserResultError; callbacks->on_subscribe_ack(user, subscribe_ack->return_code < 3); callbacks->on_finished(user, sizeof(struct SubscribeAck)); return kMqttParserResultFinished; } enum MqttParserResult MqttParserParse( const void* buffer, size_t buffer_size, const struct MqttParserCallbacks* callbacks, void* user) { if (!buffer_size) return kMqttParserResultWantMore; const uint8_t* data = buffer; switch (data[0] & 0xf0) { case 0x20: return ParseConnectAck(buffer, buffer_size, callbacks, user); case 0x30: return ParsePublish(buffer, buffer_size, callbacks, user); case 0x90: return ParseSubscribeAck(buffer, buffer_size, callbacks, user); default: return kMqttParserResultError; } }