diff options
| -rw-r--r-- | main.c | 45 | ||||
| -rw-r--r-- | makefile | 3 |
2 files changed, 42 insertions, 6 deletions
@@ -22,6 +22,7 @@ //#include <search.h> #include <arpa/inet.h> #include <netinet/in.h> +#include <search.h> #include <signal.h> #include <stdbool.h> #include <stdint.h> @@ -30,6 +31,7 @@ #include <sys/socket.h> #include <unistd.h> +#include "message.h" #include "toolbox/buffer.h" #include "toolbox/http_parser.h" #include "toolbox/io_muxer.h" @@ -53,6 +55,7 @@ struct ServiceContext { struct IoMuxer io_muxer; struct Buffer mqtt_buffer; struct ClientContext* clients; + void* messages; }; static volatile sig_atomic_t g_signal; @@ -105,8 +108,8 @@ static void DropClientContext(struct ClientContext* client) { *next = client->next; BufferDestroy(&client->http_buffer); - if (client->target) free(client->target); - if (client->method) free(client->method); + free(client->target); + free(client->method); close(client->sock); free(client); } @@ -293,7 +296,37 @@ static void OnMqttPublish(void* user, const char* topic, size_t topic_size, (void)user; LOGD("%.*s <- %.*s", (int)topic_size, topic, (int)payload_size, (const char*)payload); - // TODO(mburakov): Implement me! + void* payload_copy = malloc(payload_size); + if (!payload_copy) { + LOGW("Failed to copy payload (%s)", strerror(errno)); + return; + } + struct Message key = { + .topic = *(void**)(void*)(&topic), + .topic_size = topic_size, + }; + struct Message** node = tsearch(&key, &g_service.messages, MessageCompare); + if (!node) { + LOGW("Failed to create message node (%s)", strerror(errno)); + goto free_payload; + } + if (*node == &key) { + struct Message* message = MessageCreate(topic, topic_size); + if (!message) { + LOGW("Failed to create message (%s)", strerror(errno)); + goto delete_node; + } + *node = message; + } + free((*node)->payload); + (*node)->payload = payload_copy; + (*node)->payload_size = payload_size; + return; + +delete_node: + tdelete(&key, &g_service.messages, MessageCompare); +free_payload: + free(payload_copy); } static void OnMqttFinished(void* user, size_t offset) { @@ -408,9 +441,9 @@ int main(int argc, char* argv[]) { OnSignal(SIGABRT); } } - while (g_service.clients) { - DropClientContext(g_service.clients); - } + tdestroy(g_service.messages, MessageDestroy); + while (g_service.clients) DropClientContext(g_service.clients); + BufferDestroy(&g_service.mqtt_buffer); destroy_iomuxer: IoMuxerDestroy(&g_service.io_muxer); @@ -13,6 +13,9 @@ obj+=\ libs:=\ luajit +CFLAGS+=\ + -D_GNU_SOURCE + CFLAGS+=$(shell pkg-config --cflags $(libs)) LDFLAGS+=$(shell pkg-config --libs $(libs)) |
