diff options
-rw-r--r-- | mqtt.c | 138 | ||||
-rw-r--r-- | mqtt.h | 41 | ||||
-rw-r--r-- | mqtt_parser.c | 102 | ||||
-rw-r--r-- | mqtt_parser.h | 50 |
4 files changed, 331 insertions, 0 deletions
@@ -0,0 +1,138 @@ +/* + * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox. + * + * toolbox is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * toolbox is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with toolbox. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "mqtt.h" + +#include <arpa/inet.h> +#include <assert.h> +#include <sys/uio.h> +#include <unistd.h> + +#define LENGTH(op) (sizeof(op) / sizeof *(op)) +#define UNCONST(op) ((void*)(uintptr_t)(op)) + +static size_t WriteVarint(size_t varint, uint8_t* buffer) { + if (varint > 268435455) return 0; + size_t result = 0; + for (;;) { + buffer[result] = varint & 0x7f; + varint = varint >> 7; + if (varint) { + buffer[result] |= 0x80; + result++; + } else { + return result + 1; + } + } +} + +bool MqttConnect(int mqtt, uint16_t keepalive) { + struct __attribute__((__packed__)) { + uint8_t packet_type; + uint8_t message_length; + uint16_t protocol_name_length; + char protocol_name[4]; + uint8_t protocol_level; + uint8_t connect_flags; + uint16_t keepalive; + uint16_t client_id_length; + } connect_message = { + .packet_type = 0x10, + .message_length = 12, + .protocol_name_length = htons(4), + .protocol_name = {'M', 'Q', 'T', 'T'}, + .protocol_level = 4, + .connect_flags = 0x02, + .keepalive = htons(keepalive), + .client_id_length = 0, + }; + static_assert(sizeof(connect_message) == 14, + "Unexpected connect message size"); + return write(mqtt, &connect_message, sizeof(connect_message)) == + sizeof(connect_message); +} + +bool MqttSubscribe(int mqtt, uint16_t message_id, const char* topic, + uint16_t topic_size) { + uint8_t prefix[9] = {0x82}; + static const uint8_t qos[] = {0}; + size_t prefix_digits = WriteVarint( + sizeof(message_id) + sizeof(topic_size) + topic_size + sizeof(qos), + prefix + 1); + if (!prefix_digits) return false; + + uint8_t* ptr = prefix + 1 + prefix_digits; + *ptr++ = message_id >> 8 & 0xff; + *ptr++ = message_id & 0xff; + *ptr++ = topic_size >> 8 & 0xff; + *ptr++ = topic_size & 0xff; + + struct iovec iov[] = { + {.iov_base = prefix, .iov_len = (size_t)(ptr - prefix)}, + {.iov_base = UNCONST(topic), .iov_len = topic_size}, + {.iov_base = UNCONST(qos), .iov_len = sizeof(qos)}, + }; + return writev(mqtt, iov, LENGTH(iov)) == + (ssize_t)(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len); +} + +bool MqttPublish(int mqtt, const char* topic, uint16_t topic_size, + const void* payload, size_t payload_size) { + uint8_t prefix[7] = {0x30}; + size_t prefix_digits = + WriteVarint(sizeof(topic_size) + topic_size + payload_size, prefix + 1); + if (!prefix_digits) return false; + + uint8_t* ptr = prefix + 1 + prefix_digits; + *ptr++ = topic_size >> 8 & 0xff; + *ptr++ = topic_size & 0xff; + + struct iovec iov[] = { + {.iov_base = prefix, .iov_len = (size_t)(ptr - prefix)}, + {.iov_base = UNCONST(topic), .iov_len = topic_size}, + {.iov_base = UNCONST(payload), .iov_len = payload_size}, + }; + return writev(mqtt, iov, LENGTH(iov)) == + (ssize_t)(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len); +} + +bool MqttPing(int mqtt) { + struct __attribute__((__packed__)) { + uint8_t packet_type; + uint8_t message_length; + } ping_message = { + .packet_type = 0xd0, + .message_length = 0, + }; + static_assert(sizeof(ping_message) == 2, "Unexpected ping message size"); + return write(mqtt, &ping_message, sizeof(ping_message)) == + sizeof(ping_message); +} + +bool MqttDisconnect(int mqtt) { + struct __attribute__((__packed__)) { + uint8_t packet_type; + uint8_t message_length; + } disconnect_message = { + .packet_type = 0xe0, + .message_length = 0, + }; + static_assert(sizeof(disconnect_message) == 2, + "Unexpected disconnect message size"); + return write(mqtt, &disconnect_message, sizeof(disconnect_message)) == + sizeof(disconnect_message); +} @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox. + * + * toolbox is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * toolbox is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with toolbox. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef TOOLBOX_MQTT_H_ +#define TOOLBOX_MQTT_H_ + +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +bool MqttConnect(int mqtt, uint16_t keepalive); +bool MqttSubscribe(int mqtt, uint16_t message_id, const char* topic, + uint16_t topic_size); +bool MqttPublish(int mqtt, const char* topic, uint16_t topic_size, + const void* payload, size_t payload_size); +bool MqttPing(int mqtt); +bool MqttDisconnect(int mqtt); + +#ifdef __cplusplus +} // extern "C" +#endif // __cplusplus + +#endif // TOOLBOX_MQTT_H_ diff --git a/mqtt_parser.c b/mqtt_parser.c new file mode 100644 index 0000000..87e79b1 --- /dev/null +++ b/mqtt_parser.c @@ -0,0 +1,102 @@ +#include "mqtt_parser.h" + +#include <assert.h> +#include <setjmp.h> +#include <stdint.h> + +struct BufferView { + const uint8_t* data; + size_t size; +}; + +struct ConnectAck { + uint8_t packet_type; + uint8_t message_length; + uint8_t connack_flags; + uint8_t return_code; +} __attribute__((packed)); + +struct SubscribeAck { + uint8_t packet_type; + uint8_t message_length; + uint16_t packet_identifier; + uint8_t return_code; +} __attribute__((packed)); + +static_assert(sizeof(struct ConnectAck) == 4, "Unexpected connect ack size"); +static_assert(sizeof(struct SubscribeAck) == 5, + "Unexpected subscribe ack size"); + +static enum MqttParserResult ParseConnectAck( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (buffer_size < sizeof(struct ConnectAck)) return kMqttParserResultWantMore; + const struct ConnectAck* connect_ack = buffer; + if (connect_ack->message_length != 2) return kMqttParserResultError; + callbacks->on_connect_ack(user, !connect_ack->return_code); + callbacks->on_finished(user, sizeof(struct ConnectAck)); + return kMqttParserResultFinished; +} + +static enum MqttParserResult ReadVarint(struct BufferView* buffer_view, + size_t* result) { + *result = 0; + for (size_t counter = 0; counter < 4; counter++) { + if (!buffer_view->size) return kMqttParserResultWantMore; + uint8_t byte = *buffer_view->data++; + buffer_view->size--; + *result |= (byte & 0x7full) << (7ull * counter); + if (~byte & 0x80) return kMqttParserResultFinished; + } + return kMqttParserResultError; +} + +static enum MqttParserResult ParsePublish( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + size_t message_length; + const uint8_t* data = buffer; + struct BufferView publish = {.data = data + 1, .size = buffer_size - 1}; + enum MqttParserResult result = ReadVarint(&publish, &message_length); + if (result != kMqttParserResultFinished) return result; + if (publish.size < message_length) return kMqttParserResultWantMore; + publish.size = message_length; + + uint16_t topic_size = (publish.data[0] << 8 | publish.data[1]) & 0xffff; + const char* topic = (const char*)&publish.data[2]; + const uint8_t* payload = publish.data + 2 + topic_size; + size_t payload_size = publish.size - 2 - topic_size; + callbacks->on_publish(user, topic, topic_size, payload, payload_size); + callbacks->on_finished(user, (size_t)(publish.data + publish.size - data)); + return kMqttParserResultFinished; +} + +static enum MqttParserResult ParseSubscribeAck( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (buffer_size < sizeof(struct SubscribeAck)) + return kMqttParserResultWantMore; + const struct SubscribeAck* subscribe_ack = buffer; + if (subscribe_ack->message_length != 3) return kMqttParserResultError; + callbacks->on_subscribe_ack(user, subscribe_ack->return_code < 3); + callbacks->on_finished(user, sizeof(struct SubscribeAck)); + return kMqttParserResultFinished; +} + +enum MqttParserResult MqttParserParse( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user) { + if (!buffer_size) return kMqttParserResultWantMore; + + const uint8_t* data = buffer; + switch (data[0] & 0xf0) { + case 0x20: + return ParseConnectAck(buffer, buffer_size, callbacks, user); + case 0x30: + return ParsePublish(buffer, buffer_size, callbacks, user); + case 0x90: + return ParseSubscribeAck(buffer, buffer_size, callbacks, user); + default: + return kMqttParserResultError; + } +} diff --git a/mqtt_parser.h b/mqtt_parser.h new file mode 100644 index 0000000..684cca4 --- /dev/null +++ b/mqtt_parser.h @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox. + * + * toolbox is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * toolbox is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with toolbox. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef TOOLBOX_MQTT_PARSER_H_ +#define TOOLBOX_MQTT_PARSER_H_ + +#include <stdbool.h> +#include <stddef.h> + +#ifdef __cplusplus +extern "C" { +#endif // __cplusplus + +enum MqttParserResult { + kMqttParserResultFinished = 0, + kMqttParserResultWantMore, + kMqttParserResultError, +}; + +struct MqttParserCallbacks { + void (*on_connect_ack)(void* user, bool success); + void (*on_subscribe_ack)(void* user, bool success); + void (*on_publish)(void* user, const char* topic, size_t topic_size, + const void* payload, size_t payload_size); + void (*on_finished)(void* user, size_t offset); +}; + +enum MqttParserResult MqttParserParse( + const void* buffer, size_t buffer_size, + const struct MqttParserCallbacks* callbacks, void* user); + +#ifdef __cplusplus +} // extern "C" +#endif // __cplusplus + +#endif // TOOLBOX_MQTT_PARSER_H_ |