diff options
| author | Mikhail Burakov <mburakov@mailbox.org> | 2023-01-06 13:39:24 +0100 |
|---|---|---|
| committer | Mikhail Burakov <mburakov@mailbox.org> | 2023-01-06 13:39:24 +0100 |
| commit | 934df81c5296f61b6b4cbc5e9fd25c98cdd5993e (patch) | |
| tree | bdc8ac6c48075f6d0712b88c778bfcf41b0543a0 /main.c | |
| parent | d01adb0c57be549d50225cb3363e8e805053a7c6 (diff) | |
Preserve mqtt messages in service context
Diffstat (limited to 'main.c')
| -rw-r--r-- | main.c | 45 |
1 files changed, 39 insertions, 6 deletions
@@ -22,6 +22,7 @@ //#include <search.h> #include <arpa/inet.h> #include <netinet/in.h> +#include <search.h> #include <signal.h> #include <stdbool.h> #include <stdint.h> @@ -30,6 +31,7 @@ #include <sys/socket.h> #include <unistd.h> +#include "message.h" #include "toolbox/buffer.h" #include "toolbox/http_parser.h" #include "toolbox/io_muxer.h" @@ -53,6 +55,7 @@ struct ServiceContext { struct IoMuxer io_muxer; struct Buffer mqtt_buffer; struct ClientContext* clients; + void* messages; }; static volatile sig_atomic_t g_signal; @@ -105,8 +108,8 @@ static void DropClientContext(struct ClientContext* client) { *next = client->next; BufferDestroy(&client->http_buffer); - if (client->target) free(client->target); - if (client->method) free(client->method); + free(client->target); + free(client->method); close(client->sock); free(client); } @@ -293,7 +296,37 @@ static void OnMqttPublish(void* user, const char* topic, size_t topic_size, (void)user; LOGD("%.*s <- %.*s", (int)topic_size, topic, (int)payload_size, (const char*)payload); - // TODO(mburakov): Implement me! + void* payload_copy = malloc(payload_size); + if (!payload_copy) { + LOGW("Failed to copy payload (%s)", strerror(errno)); + return; + } + struct Message key = { + .topic = *(void**)(void*)(&topic), + .topic_size = topic_size, + }; + struct Message** node = tsearch(&key, &g_service.messages, MessageCompare); + if (!node) { + LOGW("Failed to create message node (%s)", strerror(errno)); + goto free_payload; + } + if (*node == &key) { + struct Message* message = MessageCreate(topic, topic_size); + if (!message) { + LOGW("Failed to create message (%s)", strerror(errno)); + goto delete_node; + } + *node = message; + } + free((*node)->payload); + (*node)->payload = payload_copy; + (*node)->payload_size = payload_size; + return; + +delete_node: + tdelete(&key, &g_service.messages, MessageCompare); +free_payload: + free(payload_copy); } static void OnMqttFinished(void* user, size_t offset) { @@ -408,9 +441,9 @@ int main(int argc, char* argv[]) { OnSignal(SIGABRT); } } - while (g_service.clients) { - DropClientContext(g_service.clients); - } + tdestroy(g_service.messages, MessageDestroy); + while (g_service.clients) DropClientContext(g_service.clients); + BufferDestroy(&g_service.mqtt_buffer); destroy_iomuxer: IoMuxerDestroy(&g_service.io_muxer); |
