diff options
author | Mikhail Burakov <mburakov@mailbox.org> | 2022-12-26 08:33:32 +0100 |
---|---|---|
committer | Mikhail Burakov <mburakov@mailbox.org> | 2022-12-26 08:33:32 +0100 |
commit | 56df239a929cc050f7c22ccf9010c657e65c4b9a (patch) | |
tree | 5c1be36eb4bbf8993c5387e37e6b48c2cfd8acab /mqtt_parser.c | |
parent | 16c0e4a3c8de3c96d29fd495c1aded36d8935759 (diff) |
Add new mqtt implementation to toolbox
Diffstat (limited to 'mqtt_parser.c')
-rw-r--r-- | mqtt_parser.c | 102 |
1 files changed, 102 insertions, 0 deletions
diff --git a/mqtt_parser.c b/mqtt_parser.c new file mode 100644 index 0000000..87e79b1 --- /dev/null +++ b/mqtt_parser.c @@ -0,0 +1,102 @@ +#include "mqtt_parser.h" + +#include <assert.h> +#include <setjmp.h> +#include <stdint.h> + +struct BufferView { + const uint8_t* data; + size_t size; +}; + +struct ConnectAck { + uint8_t packet_type; + uint8_t message_length; + uint8_t connack_flags; + uint8_t return_code; +} __attribute__((packed)); + +struct SubscribeAck { + uint8_t packet_type; + uint8_t message_length; + uint16_t packet_identifier; + uint8_t return_code; +} __attribute__((packed)); + +static_assert(sizeof(struct ConnectAck) == 4, "Unexpected connect ack size"); +static_assert(sizeof(struct SubscribeAck) == 5, + "Unexpected subscribe ack size"); + +static enum MqttParserResult ParseConnectAck( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (buffer_size < sizeof(struct ConnectAck)) return kMqttParserResultWantMore; + const struct ConnectAck* connect_ack = buffer; + if (connect_ack->message_length != 2) return kMqttParserResultError; + callbacks->on_connect_ack(user, !connect_ack->return_code); + callbacks->on_finished(user, sizeof(struct ConnectAck)); + return kMqttParserResultFinished; +} + +static enum MqttParserResult ReadVarint(struct BufferView* buffer_view, + size_t* result) { + *result = 0; + for (size_t counter = 0; counter < 4; counter++) { + if (!buffer_view->size) return kMqttParserResultWantMore; + uint8_t byte = *buffer_view->data++; + buffer_view->size--; + *result |= (byte & 0x7full) << (7ull * counter); + if (~byte & 0x80) return kMqttParserResultFinished; + } + return kMqttParserResultError; +} + +static enum MqttParserResult ParsePublish( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + size_t message_length; + const uint8_t* data = buffer; + struct BufferView publish = {.data = data + 1, .size = buffer_size - 1}; + enum MqttParserResult result = ReadVarint(&publish, &message_length); + if (result != kMqttParserResultFinished) return result; + if (publish.size < message_length) return kMqttParserResultWantMore; + publish.size = message_length; + + uint16_t topic_size = (publish.data[0] << 8 | publish.data[1]) & 0xffff; + const char* topic = (const char*)&publish.data[2]; + const uint8_t* payload = publish.data + 2 + topic_size; + size_t payload_size = publish.size - 2 - topic_size; + callbacks->on_publish(user, topic, topic_size, payload, payload_size); + callbacks->on_finished(user, (size_t)(publish.data + publish.size - data)); + return kMqttParserResultFinished; +} + +static enum MqttParserResult ParseSubscribeAck( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (buffer_size < sizeof(struct SubscribeAck)) + return kMqttParserResultWantMore; + const struct SubscribeAck* subscribe_ack = buffer; + if (subscribe_ack->message_length != 3) return kMqttParserResultError; + callbacks->on_subscribe_ack(user, subscribe_ack->return_code < 3); + callbacks->on_finished(user, sizeof(struct SubscribeAck)); + return kMqttParserResultFinished; +} + +enum MqttParserResult MqttParserParse( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (!buffer_size) return kMqttParserResultWantMore; + + const uint8_t* data = buffer; + switch (data[0] & 0xf0) { + case 0x20: + return ParseConnectAck(buffer, buffer_size, callbacks, user); + case 0x30: + return ParsePublish(buffer, buffer_size, callbacks, user); + case 0x90: + return ParseSubscribeAck(buffer, buffer_size, callbacks, user); + default: + return kMqttParserResultError; + } +} |