summaryrefslogtreecommitdiff
path: root/mqtt_parser.c
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2022-12-26 08:33:32 +0100
committerMikhail Burakov <mburakov@mailbox.org>2022-12-26 08:33:32 +0100
commit56df239a929cc050f7c22ccf9010c657e65c4b9a (patch)
tree5c1be36eb4bbf8993c5387e37e6b48c2cfd8acab /mqtt_parser.c
parent16c0e4a3c8de3c96d29fd495c1aded36d8935759 (diff)
Add new mqtt implementation to toolbox
Diffstat (limited to 'mqtt_parser.c')
-rw-r--r--mqtt_parser.c102
1 files changed, 102 insertions, 0 deletions
diff --git a/mqtt_parser.c b/mqtt_parser.c
new file mode 100644
index 0000000..87e79b1
--- /dev/null
+++ b/mqtt_parser.c
@@ -0,0 +1,102 @@
+#include "mqtt_parser.h"
+
+#include <assert.h>
+#include <setjmp.h>
+#include <stdint.h>
+
+struct BufferView {
+ const uint8_t* data;
+ size_t size;
+};
+
+struct ConnectAck {
+ uint8_t packet_type;
+ uint8_t message_length;
+ uint8_t connack_flags;
+ uint8_t return_code;
+} __attribute__((packed));
+
+struct SubscribeAck {
+ uint8_t packet_type;
+ uint8_t message_length;
+ uint16_t packet_identifier;
+ uint8_t return_code;
+} __attribute__((packed));
+
+static_assert(sizeof(struct ConnectAck) == 4, "Unexpected connect ack size");
+static_assert(sizeof(struct SubscribeAck) == 5,
+ "Unexpected subscribe ack size");
+
+static enum MqttParserResult ParseConnectAck(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (buffer_size < sizeof(struct ConnectAck)) return kMqttParserResultWantMore;
+ const struct ConnectAck* connect_ack = buffer;
+ if (connect_ack->message_length != 2) return kMqttParserResultError;
+ callbacks->on_connect_ack(user, !connect_ack->return_code);
+ callbacks->on_finished(user, sizeof(struct ConnectAck));
+ return kMqttParserResultFinished;
+}
+
+static enum MqttParserResult ReadVarint(struct BufferView* buffer_view,
+ size_t* result) {
+ *result = 0;
+ for (size_t counter = 0; counter < 4; counter++) {
+ if (!buffer_view->size) return kMqttParserResultWantMore;
+ uint8_t byte = *buffer_view->data++;
+ buffer_view->size--;
+ *result |= (byte & 0x7full) << (7ull * counter);
+ if (~byte & 0x80) return kMqttParserResultFinished;
+ }
+ return kMqttParserResultError;
+}
+
+static enum MqttParserResult ParsePublish(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ size_t message_length;
+ const uint8_t* data = buffer;
+ struct BufferView publish = {.data = data + 1, .size = buffer_size - 1};
+ enum MqttParserResult result = ReadVarint(&publish, &message_length);
+ if (result != kMqttParserResultFinished) return result;
+ if (publish.size < message_length) return kMqttParserResultWantMore;
+ publish.size = message_length;
+
+ uint16_t topic_size = (publish.data[0] << 8 | publish.data[1]) & 0xffff;
+ const char* topic = (const char*)&publish.data[2];
+ const uint8_t* payload = publish.data + 2 + topic_size;
+ size_t payload_size = publish.size - 2 - topic_size;
+ callbacks->on_publish(user, topic, topic_size, payload, payload_size);
+ callbacks->on_finished(user, (size_t)(publish.data + publish.size - data));
+ return kMqttParserResultFinished;
+}
+
+static enum MqttParserResult ParseSubscribeAck(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (buffer_size < sizeof(struct SubscribeAck))
+ return kMqttParserResultWantMore;
+ const struct SubscribeAck* subscribe_ack = buffer;
+ if (subscribe_ack->message_length != 3) return kMqttParserResultError;
+ callbacks->on_subscribe_ack(user, subscribe_ack->return_code < 3);
+ callbacks->on_finished(user, sizeof(struct SubscribeAck));
+ return kMqttParserResultFinished;
+}
+
+enum MqttParserResult MqttParserParse(
+ const void* buffer, size_t buffer_size,
+ const struct MqttParserCallbacks* callbacks, void* user) {
+ if (!buffer_size) return kMqttParserResultWantMore;
+
+ const uint8_t* data = buffer;
+ switch (data[0] & 0xf0) {
+ case 0x20:
+ return ParseConnectAck(buffer, buffer_size, callbacks, user);
+ case 0x30:
+ return ParsePublish(buffer, buffer_size, callbacks, user);
+ case 0x90:
+ return ParseSubscribeAck(buffer, buffer_size, callbacks, user);
+ default:
+ return kMqttParserResultError;
+ }
+}