summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mqtt.c138
-rw-r--r--mqtt.h41
-rw-r--r--mqtt_parser.c102
-rw-r--r--mqtt_parser.h50
4 files changed, 331 insertions, 0 deletions
diff --git a/mqtt.c b/mqtt.c
new file mode 100644
index 0000000..d5235c6
--- /dev/null
+++ b/mqtt.c
@@ -0,0 +1,138 @@
+/*
+ * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox.
+ *
+ * toolbox is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * toolbox is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with toolbox. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "mqtt.h"
+
+#include <arpa/inet.h>
+#include <assert.h>
+#include <sys/uio.h>
+#include <unistd.h>
+
+#define LENGTH(op) (sizeof(op) / sizeof *(op))
+#define UNCONST(op) ((void*)(uintptr_t)(op))
+
+static size_t WriteVarint(size_t varint, uint8_t* buffer) {
+ if (varint > 268435455) return 0;
+ size_t result = 0;
+ for (;;) {
+ buffer[result] = varint & 0x7f;
+ varint = varint >> 7;
+ if (varint) {
+ buffer[result] |= 0x80;
+ result++;
+ } else {
+ return result + 1;
+ }
+ }
+}
+
+bool MqttConnect(int mqtt, uint16_t keepalive) {
+ struct __attribute__((__packed__)) {
+ uint8_t packet_type;
+ uint8_t message_length;
+ uint16_t protocol_name_length;
+ char protocol_name[4];
+ uint8_t protocol_level;
+ uint8_t connect_flags;
+ uint16_t keepalive;
+ uint16_t client_id_length;
+ } connect_message = {
+ .packet_type = 0x10,
+ .message_length = 12,
+ .protocol_name_length = htons(4),
+ .protocol_name = {'M', 'Q', 'T', 'T'},
+ .protocol_level = 4,
+ .connect_flags = 0x02,
+ .keepalive = htons(keepalive),
+ .client_id_length = 0,
+ };
+ static_assert(sizeof(connect_message) == 14,
+ "Unexpected connect message size");
+ return write(mqtt, &connect_message, sizeof(connect_message)) ==
+ sizeof(connect_message);
+}
+
+bool MqttSubscribe(int mqtt, uint16_t message_id, const char* topic,
+ uint16_t topic_size) {
+ uint8_t prefix[9] = {0x82};
+ static const uint8_t qos[] = {0};
+ size_t prefix_digits = WriteVarint(
+ sizeof(message_id) + sizeof(topic_size) + topic_size + sizeof(qos),
+ prefix + 1);
+ if (!prefix_digits) return false;
+
+ uint8_t* ptr = prefix + 1 + prefix_digits;
+ *ptr++ = message_id >> 8 & 0xff;
+ *ptr++ = message_id & 0xff;
+ *ptr++ = topic_size >> 8 & 0xff;
+ *ptr++ = topic_size & 0xff;
+
+ struct iovec iov[] = {
+ {.iov_base = prefix, .iov_len = (size_t)(ptr - prefix)},
+ {.iov_base = UNCONST(topic), .iov_len = topic_size},
+ {.iov_base = UNCONST(qos), .iov_len = sizeof(qos)},
+ };
+ return writev(mqtt, iov, LENGTH(iov)) ==
+ (ssize_t)(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
+}
+
+bool MqttPublish(int mqtt, const char* topic, uint16_t topic_size,
+ const void* payload, size_t payload_size) {
+ uint8_t prefix[7] = {0x30};
+ size_t prefix_digits =
+ WriteVarint(sizeof(topic_size) + topic_size + payload_size, prefix + 1);
+ if (!prefix_digits) return false;
+
+ uint8_t* ptr = prefix + 1 + prefix_digits;
+ *ptr++ = topic_size >> 8 & 0xff;
+ *ptr++ = topic_size & 0xff;
+
+ struct iovec iov[] = {
+ {.iov_base = prefix, .iov_len = (size_t)(ptr - prefix)},
+ {.iov_base = UNCONST(topic), .iov_len = topic_size},
+ {.iov_base = UNCONST(payload), .iov_len = payload_size},
+ };
+ return writev(mqtt, iov, LENGTH(iov)) ==
+ (ssize_t)(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len);
+}
+
+bool MqttPing(int mqtt) {
+ struct __attribute__((__packed__)) {
+ uint8_t packet_type;
+ uint8_t message_length;
+ } ping_message = {
+ .packet_type = 0xd0,
+ .message_length = 0,
+ };
+ static_assert(sizeof(ping_message) == 2, "Unexpected ping message size");
+ return write(mqtt, &ping_message, sizeof(ping_message)) ==
+ sizeof(ping_message);
+}
+
+bool MqttDisconnect(int mqtt) {
+ struct __attribute__((__packed__)) {
+ uint8_t packet_type;
+ uint8_t message_length;
+ } disconnect_message = {
+ .packet_type = 0xe0,
+ .message_length = 0,
+ };
+ static_assert(sizeof(disconnect_message) == 2,
+ "Unexpected disconnect message size");
+ return write(mqtt, &disconnect_message, sizeof(disconnect_message)) ==
+ sizeof(disconnect_message);
+}
diff --git a/mqtt.h b/mqtt.h
new file mode 100644
index 0000000..6740ffc
--- /dev/null
+++ b/mqtt.h
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox.
+ *
+ * toolbox is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * toolbox is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with toolbox. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef TOOLBOX_MQTT_H_
+#define TOOLBOX_MQTT_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+bool MqttConnect(int mqtt, uint16_t keepalive);
+bool MqttSubscribe(int mqtt, uint16_t message_id, const char* topic,
+ uint16_t topic_size);
+bool MqttPublish(int mqtt, const char* topic, uint16_t topic_size,
+ const void* payload, size_t payload_size);
+bool MqttPing(int mqtt);
+bool MqttDisconnect(int mqtt);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif // __cplusplus
+
+#endif // TOOLBOX_MQTT_H_
diff --git a/mqtt_parser.c b/mqtt_parser.c
new file mode 100644
index 0000000..87e79b1
--- /dev/null
+++ b/mqtt_parser.c
@@ -0,0 +1,102 @@
+#include "mqtt_parser.h"
+
+#include <assert.h>
+#include <setjmp.h>
+#include <stdint.h>
+
+struct BufferView {
+ const uint8_t* data;
+ size_t size;
+};
+
+struct ConnectAck {
+ uint8_t packet_type;
+ uint8_t message_length;
+ uint8_t connack_flags;
+ uint8_t return_code;
+} __attribute__((packed));
+
+struct SubscribeAck {
+ uint8_t packet_type;
+ uint8_t message_length;
+ uint16_t packet_identifier;
+ uint8_t return_code;
+} __attribute__((packed));
+
+static_assert(sizeof(struct ConnectAck) == 4, "Unexpected connect ack size");
+static_assert(sizeof(struct SubscribeAck) == 5,
+ "Unexpected subscribe ack size");
+
+static enum MqttParserResult ParseConnectAck(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (buffer_size < sizeof(struct ConnectAck)) return kMqttParserResultWantMore;
+ const struct ConnectAck* connect_ack = buffer;
+ if (connect_ack->message_length != 2) return kMqttParserResultError;
+ callbacks->on_connect_ack(user, !connect_ack->return_code);
+ callbacks->on_finished(user, sizeof(struct ConnectAck));
+ return kMqttParserResultFinished;
+}
+
+static enum MqttParserResult ReadVarint(struct BufferView* buffer_view,
+ size_t* result) {
+ *result = 0;
+ for (size_t counter = 0; counter < 4; counter++) {
+ if (!buffer_view->size) return kMqttParserResultWantMore;
+ uint8_t byte = *buffer_view->data++;
+ buffer_view->size--;
+ *result |= (byte & 0x7full) << (7ull * counter);
+ if (~byte & 0x80) return kMqttParserResultFinished;
+ }
+ return kMqttParserResultError;
+}
+
+static enum MqttParserResult ParsePublish(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ size_t message_length;
+ const uint8_t* data = buffer;
+ struct BufferView publish = {.data = data + 1, .size = buffer_size - 1};
+ enum MqttParserResult result = ReadVarint(&publish, &message_length);
+ if (result != kMqttParserResultFinished) return result;
+ if (publish.size < message_length) return kMqttParserResultWantMore;
+ publish.size = message_length;
+
+ uint16_t topic_size = (publish.data[0] << 8 | publish.data[1]) & 0xffff;
+ const char* topic = (const char*)&publish.data[2];
+ const uint8_t* payload = publish.data + 2 + topic_size;
+ size_t payload_size = publish.size - 2 - topic_size;
+ callbacks->on_publish(user, topic, topic_size, payload, payload_size);
+ callbacks->on_finished(user, (size_t)(publish.data + publish.size - data));
+ return kMqttParserResultFinished;
+}
+
+static enum MqttParserResult ParseSubscribeAck(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (buffer_size < sizeof(struct SubscribeAck))
+ return kMqttParserResultWantMore;
+ const struct SubscribeAck* subscribe_ack = buffer;
+ if (subscribe_ack->message_length != 3) return kMqttParserResultError;
+ callbacks->on_subscribe_ack(user, subscribe_ack->return_code < 3);
+ callbacks->on_finished(user, sizeof(struct SubscribeAck));
+ return kMqttParserResultFinished;
+}
+
+enum MqttParserResult MqttParserParse(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (!buffer_size) return kMqttParserResultWantMore;
+
+ const uint8_t* data = buffer;
+ switch (data[0] & 0xf0) {
+ case 0x20:
+ return ParseConnectAck(buffer, buffer_size, callbacks, user);
+ case 0x30:
+ return ParsePublish(buffer, buffer_size, callbacks, user);
+ case 0x90:
+ return ParseSubscribeAck(buffer, buffer_size, callbacks, user);
+ default:
+ return kMqttParserResultError;
+ }
+}
diff --git a/mqtt_parser.h b/mqtt_parser.h
new file mode 100644
index 0000000..684cca4
--- /dev/null
+++ b/mqtt_parser.h
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox.
+ *
+ * toolbox is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * toolbox is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with toolbox. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef TOOLBOX_MQTT_PARSER_H_
+#define TOOLBOX_MQTT_PARSER_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif // __cplusplus
+
+enum MqttParserResult {
+ kMqttParserResultFinished = 0,
+ kMqttParserResultWantMore,
+ kMqttParserResultError,
+};
+
+struct MqttParserCallbacks {
+ void (*on_connect_ack)(void* user, bool success);
+ void (*on_subscribe_ack)(void* user, bool success);
+ void (*on_publish)(void* user, const char* topic, size_t topic_size,
+ const void* payload, size_t payload_size);
+ void (*on_finished)(void* user, size_t offset);
+};
+
+enum MqttParserResult MqttParserParse(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user);
+
+#ifdef __cplusplus
+} // extern "C"
+#endif // __cplusplus
+
+#endif // TOOLBOX_MQTT_PARSER_H_