From 934df81c5296f61b6b4cbc5e9fd25c98cdd5993e Mon Sep 17 00:00:00 2001 From: Mikhail Burakov Date: Fri, 6 Jan 2023 13:39:24 +0100 Subject: Preserve mqtt messages in service context --- main.c | 45 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 39 insertions(+), 6 deletions(-) (limited to 'main.c') diff --git a/main.c b/main.c index 7f1b8d5..d5d75fc 100644 --- a/main.c +++ b/main.c @@ -22,6 +22,7 @@ //#include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ #include #include +#include "message.h" #include "toolbox/buffer.h" #include "toolbox/http_parser.h" #include "toolbox/io_muxer.h" @@ -53,6 +55,7 @@ struct ServiceContext { struct IoMuxer io_muxer; struct Buffer mqtt_buffer; struct ClientContext* clients; + void* messages; }; static volatile sig_atomic_t g_signal; @@ -105,8 +108,8 @@ static void DropClientContext(struct ClientContext* client) { *next = client->next; BufferDestroy(&client->http_buffer); - if (client->target) free(client->target); - if (client->method) free(client->method); + free(client->target); + free(client->method); close(client->sock); free(client); } @@ -293,7 +296,37 @@ static void OnMqttPublish(void* user, const char* topic, size_t topic_size, (void)user; LOGD("%.*s <- %.*s", (int)topic_size, topic, (int)payload_size, (const char*)payload); - // TODO(mburakov): Implement me! + void* payload_copy = malloc(payload_size); + if (!payload_copy) { + LOGW("Failed to copy payload (%s)", strerror(errno)); + return; + } + struct Message key = { + .topic = *(void**)(void*)(&topic), + .topic_size = topic_size, + }; + struct Message** node = tsearch(&key, &g_service.messages, MessageCompare); + if (!node) { + LOGW("Failed to create message node (%s)", strerror(errno)); + goto free_payload; + } + if (*node == &key) { + struct Message* message = MessageCreate(topic, topic_size); + if (!message) { + LOGW("Failed to create message (%s)", strerror(errno)); + goto delete_node; + } + *node = message; + } + free((*node)->payload); + (*node)->payload = payload_copy; + (*node)->payload_size = payload_size; + return; + +delete_node: + tdelete(&key, &g_service.messages, MessageCompare); +free_payload: + free(payload_copy); } static void OnMqttFinished(void* user, size_t offset) { @@ -408,9 +441,9 @@ int main(int argc, char* argv[]) { OnSignal(SIGABRT); } } - while (g_service.clients) { - DropClientContext(g_service.clients); - } + tdestroy(g_service.messages, MessageDestroy); + while (g_service.clients) DropClientContext(g_service.clients); + BufferDestroy(&g_service.mqtt_buffer); destroy_iomuxer: IoMuxerDestroy(&g_service.io_muxer); -- cgit v1.2.3