diff options
| -rw-r--r-- | main.c | 179 |
1 files changed, 87 insertions, 92 deletions
@@ -44,7 +44,6 @@ struct ClientContext { size_t content_length; struct Buffer http_buffer; struct HttpParserState http_parser; - struct ServiceContext* service; struct ClientContext* next; }; @@ -57,6 +56,7 @@ struct ServiceContext { }; static volatile sig_atomic_t g_signal; +static struct ServiceContext g_service; static void OnSignal(int signal) { g_signal = signal; } @@ -100,7 +100,7 @@ static struct sockaddr_in GetMqttAddr(int argc, char* argv[]) { } static void DropClientContext(struct ClientContext* client) { - struct ClientContext** next = &client->service->clients; + struct ClientContext** next = &g_service.clients; while (*next != client) next = &(*next)->next; *next = client->next; @@ -157,6 +157,21 @@ static void OnHttpFinished(void* user, size_t offset) { BufferDiscard(&client->http_buffer, offset); } +static void ServeHttp(int fd, const char* method, const char* target, + const void* content, size_t content_length) { + LOGD("%s %s {%.*s}", method, target, (int)content_length, + (const char*)content); + static const char hurr_durr[] = + "HTTP/1.1 200 OK\r\n" + "Content-Type: text/html; charset=UTF-8\r\n" + "Content-Length: 51\r\n" + "\r\n" + "<!DOCTYPE html>\n<html><body>" + "hurr-durr" + "</body></html>"; + write(fd, hurr_durr, sizeof(hurr_durr) - 1); +} + static void OnClientRead(void* user) { struct ClientContext* client = user; switch (BufferAppendFrom(&client->http_buffer, client->sock)) { @@ -173,64 +188,45 @@ static void OnClientRead(void* user) { .on_field = OnHttpField, .on_finished = OnHttpFinished, }; - switch (HttpParserParse(&client->http_parser, client->http_buffer.data, - client->http_buffer.size, &http_callbacks, user)) { - case kHttpParserResultFinished: - break; - case kHttpParserResultWantMore: - if (!IoMuxerOnRead(&client->service->io_muxer, client->sock, OnClientRead, - user)) { - LOGW("Failed to reschedule http client read (%s)", strerror(errno)); + for (;;) { + if (!client->http_parser.stage) { + if (client->content_length > client->http_buffer.size) + goto want_more_data; + ServeHttp(client->sock, client->method, client->target, + client->http_buffer.data, client->content_length); + HttpParserReset(&client->http_parser); + BufferDiscard(&client->http_buffer, client->content_length); + client->content_length = 0; + free(client->target); + client->target = NULL; + free(client->method); + client->method = NULL; + } + switch (HttpParserParse(&client->http_parser, client->http_buffer.data, + client->http_buffer.size, &http_callbacks, user)) { + case kHttpParserResultFinished: + continue; + case kHttpParserResultWantMore: + goto want_more_data; + case kHttpParserResultError: + LOGW("Failed to parse http request"); goto drop_client; - } - return; - case kHttpParserResultError: - LOGW("Failed to parse http request"); - goto drop_client; - default: - __builtin_unreachable(); - } - - HttpParserReset(&client->http_parser); - if (strcmp(client->method, "GET")) { - // TODO(mburakov): Unsupported method - goto drop_client; - } - if (client->content_length) { - // TODO(mburakov): Unsupported body - goto drop_client; - } - - static const char hurr_durr[] = - "HTTP/1.1 200 OK\r\n" - "Content-Type: text/html; charset=UTF-8\r\n" - "Content-Length: 51\r\n" - "\r\n" - "<!DOCTYPE html>\n<html><body>" - "hurr-durr" - "</body></html>"; - write(client->sock, hurr_durr, sizeof(hurr_durr) - 1); - - free(client->method); - client->method = NULL; - free(client->target); - client->target = NULL; - client->content_length = 0; - - if (!IoMuxerOnRead(&client->service->io_muxer, client->sock, OnClientRead, - user)) { - LOGW("Failed to reschedule http client read (%s)", strerror(errno)); - goto drop_client; + default: + __builtin_unreachable(); + } } - return; +want_more_data: + if (IoMuxerOnRead(&g_service.io_muxer, client->sock, OnClientRead, client)) + return; + LOGW("Failed to reschedule http client read (%s)", strerror(errno)); drop_client: DropClientContext(client); } static void OnHttpRead(void* user) { - struct ServiceContext* context = user; - int sock = accept(context->http, NULL, NULL); + (void)user; + int sock = accept(g_service.http, NULL, NULL); if (sock == -1) { LOGW("Failed to accept http client (%s)", strerror(errno)); return; @@ -240,7 +236,7 @@ static void OnHttpRead(void* user) { LOGW("Failed to allocate http client context (%s)", strerror(errno)); goto close_sock; } - if (!IoMuxerOnRead(&context->io_muxer, sock, OnClientRead, client)) { + if (!IoMuxerOnRead(&g_service.io_muxer, sock, OnClientRead, client)) { LOGW("Failed to schedule http client read (%s)", strerror(errno)); goto free_client; } @@ -251,14 +247,13 @@ static void OnHttpRead(void* user) { client->content_length = 0; BufferCreate(&client->http_buffer); HttpParserReset(&client->http_parser); - client->service = context; client->next = NULL; - struct ClientContext** next = &context->clients; + struct ClientContext** next = &g_service.clients; while (*next) next = &(*next)->next; *next = client; - if (!IoMuxerOnRead(&context->io_muxer, context->http, OnHttpRead, user)) { + if (!IoMuxerOnRead(&g_service.io_muxer, g_service.http, OnHttpRead, NULL)) { LOGE("Failed to reschedule http client accept (%s)", strerror(errno)); OnSignal(SIGABRT); } @@ -271,13 +266,13 @@ close_sock: } static void OnMqttConnectAck(void* user, bool success) { - struct ServiceContext* context = user; + (void)user; if (!success) { LOGW("Mqtt broker rejected connection"); OnSignal(SIGTERM); return; } - if (!MqttSubscribe(context->mqtt, 1, "+/#", 3)) { + if (!MqttSubscribe(g_service.mqtt, 1, "+/#", 3)) { LOGE("Failed to subscribe to mqtt topic (%s)", strerror(errno)); OnSignal(SIGABRT); return; @@ -302,13 +297,13 @@ static void OnMqttPublish(void* user, const char* topic, size_t topic_size, } static void OnMqttFinished(void* user, size_t offset) { - struct ServiceContext* context = user; - BufferDiscard(&context->mqtt_buffer, offset); + (void)user; + BufferDiscard(&g_service.mqtt_buffer, offset); } static void OnMqttRead(void* user) { - struct ServiceContext* context = user; - switch (BufferAppendFrom(&context->mqtt_buffer, context->mqtt)) { + (void)user; + switch (BufferAppendFrom(&g_service.mqtt_buffer, g_service.mqtt)) { case -1: LOGE("Failed to read mqtt socket (%s)", strerror(errno)); OnSignal(SIGABRT); @@ -328,14 +323,14 @@ static void OnMqttRead(void* user) { }; for (;;) { enum MqttParserResult result = - MqttParserParse(context->mqtt_buffer.data, context->mqtt_buffer.size, - &mqtt_callbacks, context); + MqttParserParse(g_service.mqtt_buffer.data, g_service.mqtt_buffer.size, + &mqtt_callbacks, NULL); switch (result) { case kMqttParserResultFinished: continue; case kMqttParserResultWantMore: - if (!IoMuxerOnRead(&context->io_muxer, context->mqtt, OnMqttRead, - context)) { + if (!IoMuxerOnRead(&g_service.io_muxer, g_service.mqtt, OnMqttRead, + NULL)) { LOGE("Failed to schedule mqtt read (%s)", strerror(errno)); OnSignal(SIGABRT); } @@ -351,42 +346,43 @@ static void OnMqttRead(void* user) { } int main(int argc, char* argv[]) { - struct sockaddr_in http_addr = GetHttpAddr(); - struct sockaddr_in mqtt_addr = GetMqttAddr(argc, argv); + const struct sockaddr_in http_addr = GetHttpAddr(); + const struct sockaddr_in mqtt_addr = GetMqttAddr(argc, argv); - struct ServiceContext context; - IoMuxerCreate(&context.io_muxer); - context.http = socket(AF_INET, SOCK_STREAM, 0); - if (context.http == -1) { + IoMuxerCreate(&g_service.io_muxer); + g_service.http = socket(AF_INET, SOCK_STREAM, 0); + if (g_service.http == -1) { LOGE("Failed to create http socket (%s)", strerror(errno)); return EXIT_FAILURE; } - int one = 1; - if (setsockopt(context.http, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { + static const int one = 1; + if (setsockopt(g_service.http, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { LOGE("Failed to reuse http socket (%s)", strerror(errno)); goto close_http; } - if (bind(context.http, (struct sockaddr*)&http_addr, sizeof(http_addr))) { + if (bind(g_service.http, (const struct sockaddr*)&http_addr, + sizeof(http_addr))) { LOGE("Failed to bind http socket (%s)", strerror(errno)); goto close_http; } - if (listen(context.http, SOMAXCONN)) { + if (listen(g_service.http, SOMAXCONN)) { LOGE("Failed to listen http socket (%s)", strerror(errno)); goto close_http; } - context.mqtt = socket(AF_INET, SOCK_STREAM, 0); - if (context.mqtt == -1) { + g_service.mqtt = socket(AF_INET, SOCK_STREAM, 0); + if (g_service.mqtt == -1) { LOGE("Failed to create mqtt socket (%s)", strerror(errno)); goto close_http; } - if (connect(context.mqtt, (struct sockaddr*)&mqtt_addr, sizeof(mqtt_addr))) { + if (connect(g_service.mqtt, (const struct sockaddr*)&mqtt_addr, + sizeof(mqtt_addr))) { LOGE("Failed to connect mqtt socket (%s)", strerror(errno)); goto close_mqtt; } // TODO(mburakov): Implement keepalive - if (!MqttConnect(context.mqtt, 65535)) { + if (!MqttConnect(g_service.mqtt, 65535)) { LOGE("Failed to connect mqtt (%s)", strerror(errno)); goto close_mqtt; } @@ -397,34 +393,33 @@ int main(int argc, char* argv[]) { goto disconnect_mqtt; } - IoMuxerCreate(&context.io_muxer); - if (!IoMuxerOnRead(&context.io_muxer, context.http, OnHttpRead, &context) || - !IoMuxerOnRead(&context.io_muxer, context.mqtt, OnMqttRead, &context)) { + IoMuxerCreate(&g_service.io_muxer); + if (!IoMuxerOnRead(&g_service.io_muxer, g_service.http, OnHttpRead, NULL) || + !IoMuxerOnRead(&g_service.io_muxer, g_service.mqtt, OnMqttRead, NULL)) { LOGE("Failed to init iomuxer (%s)", strerror(errno)); goto destroy_iomuxer; } - BufferCreate(&context.mqtt_buffer); + BufferCreate(&g_service.mqtt_buffer); while (!g_signal) { - enum IoMuxerResult result = IoMuxerIterate(&context.io_muxer, -1); + enum IoMuxerResult result = IoMuxerIterate(&g_service.io_muxer, -1); if (result == kIoMuxerResultError && errno != EINTR) { LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); OnSignal(SIGABRT); } } - - while (context.clients) { - DropClientContext(context.clients); + while (g_service.clients) { + DropClientContext(g_service.clients); } destroy_iomuxer: - IoMuxerDestroy(&context.io_muxer); + IoMuxerDestroy(&g_service.io_muxer); disconnect_mqtt: - MqttDisconnect(context.mqtt); + MqttDisconnect(g_service.mqtt); close_mqtt: - close(context.mqtt); + close(g_service.mqtt); close_http: - close(context.http); + close(g_service.http); bool success = g_signal == SIGINT || g_signal == SIGTERM; return success ? EXIT_SUCCESS : EXIT_FAILURE; |
