summaryrefslogtreecommitdiff
path: root/main.c
diff options
context:
space:
mode:
Diffstat (limited to 'main.c')
-rw-r--r--main.c45
1 files changed, 39 insertions, 6 deletions
diff --git a/main.c b/main.c
index 7f1b8d5..d5d75fc 100644
--- a/main.c
+++ b/main.c
@@ -22,6 +22,7 @@
//#include <search.h>
#include <arpa/inet.h>
#include <netinet/in.h>
+#include <search.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
@@ -30,6 +31,7 @@
#include <sys/socket.h>
#include <unistd.h>
+#include "message.h"
#include "toolbox/buffer.h"
#include "toolbox/http_parser.h"
#include "toolbox/io_muxer.h"
@@ -53,6 +55,7 @@ struct ServiceContext {
struct IoMuxer io_muxer;
struct Buffer mqtt_buffer;
struct ClientContext* clients;
+ void* messages;
};
static volatile sig_atomic_t g_signal;
@@ -105,8 +108,8 @@ static void DropClientContext(struct ClientContext* client) {
*next = client->next;
BufferDestroy(&client->http_buffer);
- if (client->target) free(client->target);
- if (client->method) free(client->method);
+ free(client->target);
+ free(client->method);
close(client->sock);
free(client);
}
@@ -293,7 +296,37 @@ static void OnMqttPublish(void* user, const char* topic, size_t topic_size,
(void)user;
LOGD("%.*s <- %.*s", (int)topic_size, topic, (int)payload_size,
(const char*)payload);
- // TODO(mburakov): Implement me!
+ void* payload_copy = malloc(payload_size);
+ if (!payload_copy) {
+ LOGW("Failed to copy payload (%s)", strerror(errno));
+ return;
+ }
+ struct Message key = {
+ .topic = *(void**)(void*)(&topic),
+ .topic_size = topic_size,
+ };
+ struct Message** node = tsearch(&key, &g_service.messages, MessageCompare);
+ if (!node) {
+ LOGW("Failed to create message node (%s)", strerror(errno));
+ goto free_payload;
+ }
+ if (*node == &key) {
+ struct Message* message = MessageCreate(topic, topic_size);
+ if (!message) {
+ LOGW("Failed to create message (%s)", strerror(errno));
+ goto delete_node;
+ }
+ *node = message;
+ }
+ free((*node)->payload);
+ (*node)->payload = payload_copy;
+ (*node)->payload_size = payload_size;
+ return;
+
+delete_node:
+ tdelete(&key, &g_service.messages, MessageCompare);
+free_payload:
+ free(payload_copy);
}
static void OnMqttFinished(void* user, size_t offset) {
@@ -408,9 +441,9 @@ int main(int argc, char* argv[]) {
OnSignal(SIGABRT);
}
}
- while (g_service.clients) {
- DropClientContext(g_service.clients);
- }
+ tdestroy(g_service.messages, MessageDestroy);
+ while (g_service.clients) DropClientContext(g_service.clients);
+ BufferDestroy(&g_service.mqtt_buffer);
destroy_iomuxer:
IoMuxerDestroy(&g_service.io_muxer);