From b2ccb9ff8fa7e7ad14143cfbf7e26ec79bfbfdc8 Mon Sep 17 00:00:00 2001 From: Mikhail Burakov Date: Mon, 26 Dec 2022 12:15:30 +0100 Subject: Initial commit for version 2 --- logging.c | 37 ------ logging.h | 33 ----- main.c | 428 +++++++++++++++++++++++++++----------------------------------- makefile | 20 +-- server.c | 237 ---------------------------------- server.h | 41 ------ toolbox | 2 +- uhttp.c | 263 -------------------------------------- uhttp.h | 50 -------- 9 files changed, 199 insertions(+), 912 deletions(-) delete mode 100644 logging.c delete mode 100644 logging.h delete mode 100644 server.c delete mode 100644 server.h delete mode 100644 uhttp.c delete mode 100644 uhttp.h diff --git a/logging.c b/logging.c deleted file mode 100644 index 4d72b88..0000000 --- a/logging.c +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#include "logging.h" - -#include -#include -#include - -void TerminateImpl(const char* fmt, ...) { - va_list args; - va_start(args, fmt); - vfprintf(stderr, fmt, args); - va_end(args); - exit(EXIT_FAILURE); -} - -void LogImpl(const char* fmt, ...) { - va_list args; - va_start(args, fmt); - vfprintf(stderr, fmt, args); - va_end(args); -} diff --git a/logging.h b/logging.h deleted file mode 100644 index 823880a..0000000 --- a/logging.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#ifndef MQHTTP_LOGGING_H_ -#define MQHTTP_LOGGING_H_ - -#define STR_IMPL(op) #op -#define STR(op) STR_IMPL(op) -#define LOGGING_WRAP(impl, fmt, ...) \ - impl(__FILE__ ":" STR(__LINE__) " " fmt "\n", ##__VA_ARGS__) -#define Terminate(fmt, ...) LOGGING_WRAP(TerminateImpl, fmt, ##__VA_ARGS__) -#define Log(fmt, ...) LOGGING_WRAP(LogImpl, fmt, ##__VA_ARGS__) - -_Noreturn void TerminateImpl(const char* fmt, ...) - __attribute__((__format__(printf, 1, 2))); - -void LogImpl(const char* fmt, ...) __attribute__((__format__(printf, 1, 2))); - -#endif // MQHTTP_LOGGING_H_ diff --git a/main.c b/main.c index 47c538e..ae85e2c 100644 --- a/main.c +++ b/main.c @@ -15,289 +15,236 @@ * along with MQhTTp. If not, see . */ -#include +//#include #include -#include -#include -#include -#include +//#include +//#include +//#include +#include +#include #include -#include +#include +#include #include #include -#include -#include +#include #include -#include "logging.h" -#include "server.h" +#include "toolbox/buffer.h" +#include "toolbox/io_muxer.h" +#include "toolbox/mqtt.h" +#include "toolbox/mqtt_parser.h" +#include "toolbox/utils.h" -#define UNCONST(op) ((void*)(uintptr_t)(op)) - -struct Message { - char* topic; - void* payload; - size_t payload_size; - int handler; -}; - -struct Context { - struct mosquitto* mosq; - void* messages; - char* buffer; - size_t size; - size_t alloc; - lua_State* lua_state; +struct ServiceContext { + int http; + int mqtt; + struct IoMuxer io_muxer; + struct Buffer mqtt_buffer; }; -static struct { - struct Context* context; - const char* topic; - size_t topic_len; -} g_twalk_context; +static volatile sig_atomic_t g_signal; -static volatile sig_atomic_t g_shutdown; +static void OnSignal(int signal) { g_signal = signal; } -static void OnSignal(int num) { - (void)num; - g_shutdown = 1; +static struct sockaddr_in GetHttpAddr() { + const char* sport = getenv("HTTP_PORT"); + int port = sport ? atoi(sport) : 8080; + if (0 > port || port > UINT16_MAX) { + LOGE("Invalid http port"); + } + const char* saddr = getenv("HTTP_ADDR"); + in_addr_t addr = inet_addr(saddr ? saddr : "0.0.0.0"); + if (addr == INADDR_NONE) { + LOGE("Invalid http addr"); + exit(EXIT_FAILURE); + } + struct sockaddr_in result = { + .sin_family = AF_INET, + .sin_port = htons((uint16_t)port), + .sin_addr.s_addr = addr, + }; + return result; } -static int CompareMessages(const void* a, const void* b) { - return strcmp(((const struct Message*)a)->topic, - ((const struct Message*)b)->topic); +static struct sockaddr_in GetMqttAddr(int argc, char* argv[]) { + int port = argc > 2 ? atoi(argv[2]) : 1883; + if (0 > port || port > UINT16_MAX) { + LOGE("Invalid mqtt port"); + exit(EXIT_FAILURE); + } + in_addr_t addr = inet_addr(argc > 1 ? argv[1] : "127.0.0.1"); + if (addr == INADDR_NONE) { + LOGE("Invalid mqtt addr"); + exit(EXIT_FAILURE); + } + struct sockaddr_in result = { + .sin_family = AF_INET, + .sin_port = htons((uint16_t)port), + .sin_addr.s_addr = addr, + }; + return result; } -static void FreeMessage(void* nodep) { - struct Message* message = nodep; - free(message->topic); - free(message->payload); +static void OnHttpRead(void* user) { + // TODO(mburakov): Implement me!!! } -static bool Flush(int fd, const char* status, const char* type, - const void* body, size_t body_size) { - char buffer[256]; - int length; - if (type) { - length = snprintf(buffer, sizeof(buffer), - "HTTP/1.1 %s\r\n" - "Content-Type: %s\r\n" - "Content-Length: %zu\r\n" - "\r\n", - status, type, body_size); - } else { - length = snprintf(buffer, sizeof(buffer), - "HTTP/1.1 %s\r\n" - "Content-Length: %zu\r\n" - "\r\n", - status, body_size); +static void OnMqttConnectAck(void* user, bool success) { + struct ServiceContext* context = user; + if (!success) { + LOGW("Mqtt broker rejected connection"); + OnSignal(SIGTERM); + return; } - // TODO(mburakov): Verify length is valid at this point. - struct iovec iov[] = {{.iov_base = buffer, .iov_len = (size_t)length}, - {.iov_base = UNCONST(body), .iov_len = body_size}}; - ssize_t result = writev(fd, iov, 2); - if (result != (ssize_t)(iov[0].iov_len + iov[1].iov_len)) { - Log("Failed to write complete reply (%s)", strerror(errno)); - return false; + if (!MqttSubscribe(context->mqtt, 1, "+/#", 3)) { + LOGE("Failed to subscribe to mqtt topic (%s)", strerror(errno)); + OnSignal(SIGABRT); + return; } - return true; } -static bool BufferAppend(struct Context* context, const char* data, - size_t size) { - size_t alloc = context->size + size; - if (context->alloc < alloc) { - char* buffer = realloc(context->buffer, alloc); - if (!buffer) { - Log("Failed to reallocate buffer (%s)", strerror(errno)); - return false; - } - context->buffer = buffer; - context->alloc = alloc; +static void OnMqttSubscribeAck(void* user, bool success) { + (void)user; + if (!success) { + LOGW("Mqtt broker rejected subscription"); + OnSignal(SIGTERM); + return; } - memcpy(context->buffer + context->size, data, size); - context->size += size; - return true; } -static void CollectMatchingMessages(const void* nodep, VISIT which, int level) { - (void)level; - const struct Message* const* it = nodep; - if (which == preorder || which == endorder || - strncmp((*it)->topic, g_twalk_context.topic, g_twalk_context.topic_len)) - return; - static const char kPre[] = "topic, strlen((*it)->topic)); - static const char kInt[] = "\">/"; - BufferAppend(g_twalk_context.context, kInt, strlen(kInt)); - BufferAppend(g_twalk_context.context, (*it)->topic, strlen((*it)->topic)); - static const char kPost[] = "
"; - BufferAppend(g_twalk_context.context, kPost, strlen(kPost)); +static void OnMqttPublish(void* user, const char* topic, size_t topic_size, + const void* payload, size_t payload_size) { + (void)user; + LOGD("%.*s <- %.*s", (int)topic_size, topic, (int)payload_size, + (const char*)payload); + // TODO(mburakov): Implement me! } -static bool HandleGetRequest(struct Context* context, int fd, - const char* target) { - if (!strcmp(target, "/favicon.ico")) - return Flush(fd, "404 Not Found", NULL, NULL, 0); - struct Message pred = {.topic = UNCONST(target + 1)}; - struct Message** it = tfind(&pred, &context->messages, CompareMessages); - if (it) { - return Flush(fd, "200 OK", "application/json", (*it)->payload, - (*it)->payload_size); - } - static const char kHeader[] = - "" - "" - "" - "MQhTTp" - "" - ""; - static const char kFooter[] = - "" - ""; - context->size = 0; - g_twalk_context.context = context; - g_twalk_context.topic = target + 1; - g_twalk_context.topic_len = strlen(target) - 1; - if (!BufferAppend(context, kHeader, sizeof(kHeader) - 1)) return false; - twalk(context->messages, CollectMatchingMessages); - if (!BufferAppend(context, kFooter, sizeof(kFooter) - 1)) return false; - return Flush(fd, "200 OK", "text/html", context->buffer, context->size); +static void OnMqttFinished(void* user, size_t offset) { + struct ServiceContext* context = user; + BufferDiscard(&context->mqtt_buffer, offset); } -static bool HandleRequest(void* user, int fd, const char* method, - const char* target, const void* body, - size_t body_size) { - struct Context* context = user; - if (!strcmp(method, "GET")) return HandleGetRequest(context, fd, target); - if (strcmp(method, "POST")) - return Flush(fd, "405 Method Not Allowed", NULL, NULL, 0); - if (!body_size || !target[1]) - return Flush(fd, "400 Bad Request", NULL, NULL, 0); - int mosq_errno = mosquitto_publish(context->mosq, NULL, target + 1, - (int)body_size, body, 0, false); - if (mosq_errno == MOSQ_ERR_SUCCESS) return Flush(fd, "200 OK", NULL, NULL, 0); - const char* error = mosquitto_strerror(mosq_errno); - return Flush(fd, "500 Internal Server Error", "text/plain", error, - strlen(error)); +static void OnMqttRead(void* user) { + struct ServiceContext* context = user; + switch (BufferAppendFrom(&context->mqtt_buffer, context->mqtt)) { + case -1: + LOGE("Failed to read mqtt socket (%s)", strerror(errno)); + OnSignal(SIGABRT); + return; + case 0: + LOGW("Mqtt broker closed connection"); + OnSignal(SIGTERM); + return; + default: + break; + } + static const struct MqttParserCallbacks mqtt_callbacks = { + .on_connect_ack = OnMqttConnectAck, + .on_subscribe_ack = OnMqttSubscribeAck, + .on_publish = OnMqttPublish, + .on_finished = OnMqttFinished, + }; + for (;;) { + enum MqttParserResult result = + MqttParserParse(context->mqtt_buffer.data, context->mqtt_buffer.size, + &mqtt_callbacks, context); + switch (result) { + case kMqttParserResultFinished: + continue; + case kMqttParserResultWantMore: + if (!IoMuxerOnRead(&context->io_muxer, context->mqtt, OnMqttRead, + context)) { + LOGE("Failed to schedule mqtt read (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + return; + case kMqttParserResultError: + LOGE("Failed to parse mqtt message"); + OnSignal(SIGABRT); + return; + default: + __builtin_unreachable(); + } + } } -static struct Message* GetMessage(void** messages, const char* topic) { - struct Message pred = {.topic = UNCONST(topic)}; - struct Message** it = tsearch(&pred, messages, CompareMessages); - if (!it) { - Log("Failed to add message to the map"); - return NULL; +int main(int argc, char* argv[]) { + struct sockaddr_in http_addr = GetHttpAddr(); + struct sockaddr_in mqtt_addr = GetMqttAddr(argc, argv); + + struct ServiceContext context; + IoMuxerCreate(&context.io_muxer); + context.http = socket(AF_INET, SOCK_STREAM, 0); + if (context.http == -1) { + LOGE("Failed to create http socket (%s)", strerror(errno)); + return EXIT_FAILURE; } - if (*it != &pred) return *it; - struct Message* message = calloc(1, sizeof(struct Message)); - if (!message) { - Log("Failed to allocate new message (%s)", strerror(errno)); - goto rollback_tsearch; + + int one = 1; + if (setsockopt(context.http, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { + LOGE("Failed to reuse http socket (%s)", strerror(errno)); + goto close_http; } - message->topic = strdup(topic); - if (!message->topic) { - Log("Failed to copy topic (%s)", strerror(errno)); - goto rollback_calloc; + if (bind(context.http, (struct sockaddr*)&http_addr, sizeof(http_addr))) { + LOGE("Failed to bind http socket (%s)", strerror(errno)); + goto close_http; } - *it = message; - return message; -rollback_calloc: - free(message); -rollback_tsearch: - tdelete(&pred, messages, CompareMessages); - return NULL; -} - -static void HandleMosquitto(struct mosquitto* mosq, void* user, - const struct mosquitto_message* mosq_msg) { - (void)mosq; - struct Context* context = user; - struct Message* message = GetMessage(&context->messages, mosq_msg->topic); - if (!message) { - Log("Failed to get message"); - return; + if (listen(context.http, SOMAXCONN)) { + LOGE("Failed to listen http socket (%s)", strerror(errno)); + goto close_http; } - size_t payload_size = (size_t)mosq_msg->payloadlen; - void* buffer = malloc(payload_size); - if (!buffer) { - Log("Failed to copy payload (%s)", strerror(errno)); - return; + context.mqtt = socket(AF_INET, SOCK_STREAM, 0); + if (context.mqtt == -1) { + LOGE("Failed to create mqtt socket (%s)", strerror(errno)); + goto close_http; } - memcpy(buffer, mosq_msg->payload, payload_size); - free(message->payload); - message->payload = buffer; - message->payload_size = payload_size; - if (message->handler) { - // TODO(mburakov): Handle lua errors. - lua_rawgeti(context->lua_state, LUA_REGISTRYINDEX, message->handler); - lua_pushlstring(context->lua_state, message->payload, - message->payload_size); - lua_pcall(context->lua_state, 1, 0, 0); + + if (connect(context.mqtt, (struct sockaddr*)&mqtt_addr, sizeof(mqtt_addr))) { + LOGE("Failed to connect mqtt socket (%s)", strerror(errno)); + goto close_mqtt; + } + // TODO(mburakov): Implement keepalive + if (!MqttConnect(context.mqtt, 65535)) { + LOGE("Failed to connect mqtt (%s)", strerror(errno)); + goto close_mqtt; } -} -static void SourceCurrentDir(lua_State* lua_state) { - DIR* current_dir = opendir("."); - if (!current_dir) Terminate("Failed to open current dir"); - for (struct dirent* item; (item = readdir(current_dir));) { - if (item->d_type != DT_REG) continue; - size_t length = strlen(item->d_name); - static const char kLuaExt[] = {'.', 'l', 'u', 'a'}; - if (length < sizeof(kLuaExt)) continue; - const char* ext = item->d_name + length - sizeof(kLuaExt); - if (memcmp(ext, kLuaExt, sizeof(kLuaExt))) continue; - Log("Sourcing %s...", item->d_name); - if (luaL_dofile(lua_state, item->d_name)) - Log("%s", lua_tostring(lua_state, -1)); + if (signal(SIGINT, OnSignal) == SIG_ERR || + signal(SIGTERM, OnSignal) == SIG_ERR) { + LOGE("Failed to set signal handlers (%s)", strerror(errno)); + goto disconnect_mqtt; } - closedir(current_dir); -} -static int LuaSubscribe(lua_State* lua_state) { - // TODO(mburakov): Handle lua errors. -#if 0 - // mburakov: Userdata is broken on AArch64 - struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1)); -#else - const void* ctx = lua_tolstring(lua_state, lua_upvalueindex(1), NULL); - struct Context* context = *(void* const*)ctx; -#endif - const char* topic = lua_tolstring(lua_state, -2, NULL); - struct Message* message = GetMessage(&context->messages, topic); - if (!message) { - Log("Failed to get message"); - return 0; + IoMuxerCreate(&context.io_muxer); + if (!IoMuxerOnRead(&context.io_muxer, context.http, OnHttpRead, &context) || + !IoMuxerOnRead(&context.io_muxer, context.mqtt, OnMqttRead, &context)) { + LOGE("Failed to init iomuxer (%s)", strerror(errno)); + goto destroy_iomuxer; } - message->handler = luaL_ref(lua_state, LUA_REGISTRYINDEX); - return 0; -} -static int LuaPublish(lua_State* lua_state) { - // TODO(mburakov): Handle lua errors. -#if 0 - // mburakov: Userdata is broken on AArch64 - struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1)); -#else - const void* ctx = lua_tolstring(lua_state, lua_upvalueindex(1), NULL); - struct Context* context = *(void* const*)ctx; -#endif - char* buffer = context->buffer; - size_t topic_size, payload_size; - const char* topic = lua_tolstring(lua_state, -2, &topic_size); - const char* payload = lua_tolstring(lua_state, -1, &payload_size); - // mburakov: Not allowed to publish from mosquitto callback. - if (!BufferAppend(context, topic, topic_size + 1) || - !BufferAppend(context, payload, payload_size + 1)) { - Log("Failed to schedule mosquitto publish"); - context->buffer = buffer; - return 0; + BufferCreate(&context.mqtt_buffer); + while (!g_signal) { + enum IoMuxerResult result = IoMuxerIterate(&context.io_muxer, -1); + if (result == kIoMuxerResultError && errno != EINTR) { + LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); + OnSignal(SIGABRT); + } } - return 0; -} -int main(int argc, char* argv[]) { +destroy_iomuxer: + IoMuxerDestroy(&context.io_muxer); +disconnect_mqtt: + MqttDisconnect(context.mqtt); +close_mqtt: + close(context.mqtt); +close_http: + close(context.http); + bool success = g_signal == SIGINT || g_signal == SIGTERM; + return success ? EXIT_SUCCESS : EXIT_FAILURE; + +#if 0 if (argc < 3) Terminate("Usage: %s ", argv[0]); int port = atoi(argv[2]); if (0 >= port || port >= 65536) @@ -433,4 +380,5 @@ rollback_epoll_create: free(context.buffer); close(epfd); return result; +#endif } diff --git a/makefile b/makefile index 4f0cb04..674f704 100644 --- a/makefile +++ b/makefile @@ -2,16 +2,16 @@ bin:=$(notdir $(shell pwd)) src:=$(shell ls *.c) obj:=$(src:.c=.o) -libs?=luajit - -CFLAGS?=\ - -march=native -O3 -flto \ - -Wall -Wextra -pedantic \ - -D_GNU_SOURCE - -LDFLAGS?=\ - -O3 -s -flto \ - -lmosquitto +obj+=\ + toolbox/buffer.o \ + toolbox/http_parser.o \ + toolbox/io_muxer.o \ + toolbox/mqtt.o \ + toolbox/mqtt_parser.o \ + toolbox/utils.o + +libs:=\ + luajit CFLAGS+=$(shell pkg-config --cflags $(libs)) LDFLAGS+=$(shell pkg-config --libs $(libs)) diff --git a/server.c b/server.c deleted file mode 100644 index 7f647aa..0000000 --- a/server.c +++ /dev/null @@ -1,237 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#include "server.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "logging.h" -#include "uhttp.h" - -struct Client { - int fd; - struct Uhttp* uhttp; -}; - -struct Server { - int epfd; - ServerHandler handler; - void* user; - int fd; - void* clients; -}; - -static int CompareClients(const void* a, const void* b) { - int fda = ((const struct Client*)a)->fd; - int fdb = ((const struct Client*)b)->fd; - return (fda > fdb) - (fda < fdb); -} - -static void FreeClient(void* nodep) { - struct Client* client = nodep; - if (client->uhttp) UhttpDestroy(client->uhttp); - close(client->fd); - free(client); -} - -static in_port_t GetPort() { - static const in_port_t kDefaultPort = 8080; - const char* http_port = getenv("HTTP_PORT"); - if (!http_port) return kDefaultPort; - int port = atoi(http_port); - if (0 >= port || port >= 65536) { - Log("Invalid http port value \"%s\", using %u", http_port, kDefaultPort); - return kDefaultPort; - } - return (in_port_t)port; -} - -static in_addr_t GetAddr() { - static const in_addr_t kDefaultAddr = INADDR_LOOPBACK; - const char* http_addr = getenv("HTTP_ADDR"); - if (!http_addr) return kDefaultAddr; - in_addr_t addr = inet_addr(http_addr); - if (addr == INADDR_NONE) { - Log("Invalid http addr value \"%s\", using loopback", http_addr); - return kDefaultAddr; - } - return ntohl(addr); -} - -static int MakeServerSocket() { - int sock = socket(AF_INET, SOCK_STREAM, 0); - if (sock == -1) { - Log("Failed to create socket (%s)", strerror(errno)); - return -1; - } - int one = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one))) { - Log("Failed to reuse address (%s)", strerror(errno)); - goto rollback_socket; - } - struct sockaddr_in addr = {.sin_family = AF_INET, - .sin_port = htons(GetPort()), - .sin_addr.s_addr = htonl(GetAddr())}; - if (bind(sock, (struct sockaddr*)&addr, sizeof(addr))) { - Log("Failed to bind socket (%s)", strerror(errno)); - goto rollback_socket; - } - if (listen(sock, SOMAXCONN)) { - Log("Failed to listen socket (%s)", strerror(errno)); - goto rollback_socket; - } - return sock; -rollback_socket: - close(sock); - return -1; -} - -static void AcceptClient(struct Server* server) { - struct Client* client = calloc(1, sizeof(struct Client)); - if (!client) { - Log("Failed to allocate client (%s)", strerror(errno)); - return; - } - client->fd = accept(server->fd, NULL, NULL); - if (client->fd == -1) { - Log("Failed to accept client (%s)", strerror(errno)); - goto rollback_calloc; - } - struct epoll_event ev = {.events = EPOLLIN, .data.fd = client->fd}; - if (epoll_ctl(server->epfd, EPOLL_CTL_ADD, client->fd, &ev)) { - Log("Failed to add client to epoll (%s)", strerror(errno)); - goto rollback_accept; - } - struct Client** it = tsearch(client, &server->clients, CompareClients); - if (!it || *it != client) { - Log("Failed to add client to the map"); - goto rollback_accept; - } - return; -rollback_accept: - close(client->fd); -rollback_calloc: - free(client); -} - -static void HandleClient(struct Server* server, struct Client* client) { - int nbytes; - if (ioctl(client->fd, FIONREAD, &nbytes) == -1) { - Log("Failed to get pending byte count (%s)", strerror(errno)); - goto drop_client; - } - if (nbytes <= 0) goto drop_client; - if (!client->uhttp) { - client->uhttp = UhttpCreate(); - if (!client->uhttp) { - Log("Failed to create uhttp"); - goto drop_client; - } - } - void* buffer = UhttpAllocate(client->uhttp, (size_t)nbytes); - if (!buffer) { - Log("Failed to allocate uhttp buffer"); - goto drop_client; - } - ssize_t result = read(client->fd, buffer, (size_t)nbytes); - switch (result) { - case -1: - Log("Failed to read client (%s)", strerror(errno)); - __attribute__((fallthrough)); - case 0: - goto drop_client; - default: - break; - } - switch (UhttpConsume(client->uhttp, (size_t)result)) { - case kUhttpResultTerminate: - Terminate("Heap corruption possible"); - case kUhttpResultFailure: - Log("Failed to parse request"); - goto drop_client; - case kUhttpResultWantMore: - return; - case kUhttpResultFinished: - break; - } - if (!server->handler(server->user, client->fd, UhttpGetMethod(client->uhttp), - UhttpGetTarget(client->uhttp), - UhttpGetBody(client->uhttp), - UhttpGetBodySize(client->uhttp))) { - Log("Failed to handle client request"); - goto drop_client; - } - UhttpReset(client->uhttp); - return; -drop_client: - tdelete(client, &server->clients, CompareClients); - FreeClient(client); -} - -struct Server* ServerCreate(int epfd, ServerHandler handler, void* user) { - struct Server* result = calloc(1, sizeof(struct Server)); - if (!result) { - Log("Failed to allocate server (%s)", strerror(errno)); - return NULL; - } - result->epfd = epfd; - result->handler = handler; - result->user = user; - result->fd = MakeServerSocket(); - if (result->fd == -1) { - Log("Failed to create server socket"); - goto rollback_calloc; - } - struct epoll_event ev = {.events = EPOLLIN, .data.fd = result->fd}; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, result->fd, &ev)) { - Log("Failed to add server to epoll (%s)", strerror(errno)); - goto rollback_make_server_socket; - } - return result; -rollback_make_server_socket: - close(result->fd); -rollback_calloc: - free(result); - return NULL; -} - -void ServerDestroy(struct Server* server) { - tdestroy(server->clients, FreeClient); - close(server->fd); - free(server); -} - -bool ServerMaybeHandle(struct Server* server, int fd) { - if (fd == server->fd) { - AcceptClient(server); - return true; - } - struct Client pred = {.fd = fd}; - struct Client** it = tfind(&pred, &server->clients, CompareClients); - if (!it) return false; - HandleClient(server, *it); - return true; -} diff --git a/server.h b/server.h deleted file mode 100644 index 9f80359..0000000 --- a/server.h +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#ifndef MQHTTP_SERVER_H_ -#define MQHTTP_SERVER_H_ - -#include -#include - -#ifdef __cplusplus -extern "C" { -#endif // __cplusplus - -typedef bool (*ServerHandler)(void* user, int fd, const char* method, - const char* target, const void* body, - size_t body_size); - -struct Server* ServerCreate(int epfd, ServerHandler handler, void* user); -void ServerDestroy(struct Server* server); - -bool ServerMaybeHandle(struct Server* server, int fd); - -#ifdef __cplusplus -} // extern "C" -#endif // __cplusplus - -#endif // MQHTTP_SERVER_H_ diff --git a/toolbox b/toolbox index da53f97..1b661ca 160000 --- a/toolbox +++ b/toolbox @@ -1 +1 @@ -Subproject commit da53f976dcd0f245e4803771c8668810a4de8691 +Subproject commit 1b661ca51f4bd42c9aeba8dbedff691d69623263 diff --git a/uhttp.c b/uhttp.c deleted file mode 100644 index 1d7e190..0000000 --- a/uhttp.c +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#include "uhttp.h" - -#include -#include -#include -#include - -struct Uhttp { - struct { - char* data; - size_t alloc; - size_t size; - } buffer; - struct { - const char* method; - const char* target; - const char* body; - size_t body_size; - } public; - struct { - _Bool (*state)(struct Uhttp*, char*); - const char* field_name; - const char* field_value; - char* maybe_ows; - _Bool maybe_eol; - } parser; -}; - -static _Bool StrEq(const char* a, const char* b) { - for (; *a && *b; a++, b++) { - if (tolower(*a) != tolower(*b)) return 0; - } - return *a == *b; -} - -static _Bool UhttpOnMethod(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnTarget(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnVersion(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnFieldName(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnFieldValue(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnBody(struct Uhttp* uhttp, char* glyph); -static _Bool UhttpOnStopped(struct Uhttp* uhttp, char* glyph); - -static _Bool UhttpOnMethod(struct Uhttp* uhttp, char* glyph) { - switch (*glyph) { - case ' ': - if (!uhttp->public.method) return 0; - uhttp->parser.state = UhttpOnTarget; - *glyph = 0; - return 1; - default: - if (!isgraph(*glyph)) return 0; - if (!uhttp->public.method) uhttp->public.method = glyph; - return 1; - } -} - -static _Bool UhttpOnTarget(struct Uhttp* uhttp, char* glyph) { - switch (*glyph) { - case ' ': - if (!uhttp->public.target) return 0; - uhttp->parser.state = UhttpOnVersion; - *glyph = 0; - return 1; - default: - if (!isgraph(*glyph)) return 0; - if (!uhttp->public.target) uhttp->public.target = glyph; - return 1; - } -} - -static _Bool UhttpOnVersion(struct Uhttp* uhttp, char* glyph) { - switch (*glyph) { - case '\r': - if (uhttp->parser.maybe_eol) return 0; - uhttp->parser.maybe_eol = 1; - return 1; - case '\n': - if (!uhttp->parser.maybe_eol) return 0; - uhttp->parser.state = UhttpOnFieldName; - uhttp->parser.maybe_eol = 0; - return 1; - default: - if (uhttp->parser.maybe_eol) return 0; - if (!isprint(*glyph)) return 0; - return 1; - } -} - -static _Bool UhttpOnFieldName(struct Uhttp* uhttp, char* glyph) { - switch (*glyph) { - case '\r': - if (uhttp->parser.maybe_eol) return 0; - uhttp->parser.maybe_eol = 1; - return 1; - case '\n': - if (!uhttp->parser.maybe_eol) return 0; - uhttp->parser.state = - uhttp->public.body_size ? UhttpOnBody : UhttpOnStopped; - return 1; - case ' ': - return 0; - case ':': - if (uhttp->parser.maybe_eol) return 0; - if (!uhttp->parser.field_name) return 0; - uhttp->parser.state = UhttpOnFieldValue; - uhttp->parser.field_value = NULL; - *glyph = 0; - return 1; - default: - if (uhttp->parser.maybe_eol) return 0; - if (!isgraph(*glyph)) return 0; - if (!uhttp->parser.field_name) uhttp->parser.field_name = glyph; - return 1; - } -} - -static _Bool UhttpOnFieldValue(struct Uhttp* uhttp, char* glyph) { - switch (*glyph) { - case '\r': - if (uhttp->parser.maybe_eol) return 0; - uhttp->parser.maybe_ows = glyph; - uhttp->parser.maybe_eol = 1; - return 1; - case '\n': - if (!uhttp->parser.maybe_eol) return 0; - if (!uhttp->parser.field_value) return 0; - *uhttp->parser.maybe_ows = 0; - if (!uhttp->public.body_size && - StrEq(uhttp->parser.field_name, "Content-Length")) - uhttp->public.body_size = (size_t)atoi(uhttp->parser.field_value); - uhttp->parser.state = UhttpOnFieldName; - uhttp->parser.field_name = NULL; - uhttp->parser.maybe_eol = 0; - return 1; - case '\t': - case ' ': - if (uhttp->parser.maybe_eol) return 0; - if (!uhttp->parser.field_value) return 1; - if (!uhttp->parser.maybe_ows) uhttp->parser.maybe_ows = glyph; - return 1; - default: - if (uhttp->parser.maybe_eol) return 0; - if (!isgraph(*glyph)) return 0; - if (!uhttp->parser.field_value) uhttp->parser.field_value = glyph; - uhttp->parser.maybe_ows = NULL; - return 1; - } -} - -static _Bool UhttpOnBody(struct Uhttp* uhttp, char* glyph) { - if (!uhttp->public.body) uhttp->public.body = glyph; - ptrdiff_t body_size = glyph - uhttp->public.body + 1; - if ((size_t)body_size == uhttp->public.body_size) - uhttp->parser.state = UhttpOnStopped; - return 1; -} - -static _Bool UhttpOnStopped(struct Uhttp* uhttp, char* glyph) { - (void)uhttp; - (void)glyph; - return 0; -} - -static void UhttpRelocateBuffer(struct Uhttp* uhttp, char* data) { - if (uhttp->public.method) - uhttp->public.method = data + (uhttp->public.method - uhttp->buffer.data); - if (uhttp->public.target) - uhttp->public.target = data + (uhttp->public.target - uhttp->buffer.data); - if (uhttp->public.body) - uhttp->public.body = data + (uhttp->public.body - uhttp->buffer.data); - if (uhttp->parser.field_name) { - uhttp->parser.field_name = - data + (uhttp->parser.field_name - uhttp->buffer.data); - } - if (uhttp->parser.field_value) { - uhttp->parser.field_value = - data + (uhttp->parser.field_value - uhttp->buffer.data); - } - if (uhttp->parser.maybe_ows) { - uhttp->parser.maybe_ows = - data + (uhttp->parser.maybe_ows - uhttp->buffer.data); - } - uhttp->buffer.data = data; -} - -struct Uhttp* UhttpCreate() { - struct Uhttp* result = calloc(1, sizeof(struct Uhttp)); - UhttpReset(result); - return result; -} - -void UhttpDestroy(struct Uhttp* uhttp) { - free(uhttp->buffer.data); - free(uhttp); -} - -void UhttpReset(struct Uhttp* uhttp) { - const struct Uhttp reset = {.buffer.data = uhttp->buffer.data, - .buffer.alloc = uhttp->buffer.alloc, - .parser.state = UhttpOnMethod}; - *uhttp = reset; -} - -void* UhttpAllocate(struct Uhttp* uhttp, size_t size) { - size_t alloc = uhttp->buffer.size + size; - if (alloc > uhttp->buffer.alloc) { - char* data = realloc(uhttp->buffer.data, alloc); - if (!data) return NULL; - if (data != uhttp->buffer.data) UhttpRelocateBuffer(uhttp, data); - uhttp->buffer.alloc = alloc; - } - return uhttp->buffer.data + uhttp->buffer.size; -} - -enum UhttpResult UhttpConsume(struct Uhttp* uhttp, size_t size) { - if (uhttp->buffer.size + size > uhttp->buffer.alloc) - return kUhttpResultTerminate; - size_t end = uhttp->buffer.size + size; - for (; uhttp->buffer.size < end; uhttp->buffer.size++) { - if (!uhttp->parser.state(uhttp, &uhttp->buffer.data[uhttp->buffer.size])) { - uhttp->parser.state = UhttpOnStopped; - return kUhttpResultFailure; - } - if (uhttp->parser.state == UhttpOnStopped) { - return kUhttpResultFinished; - } - } - return kUhttpResultWantMore; -} - -const char* UhttpGetMethod(const struct Uhttp* uhttp) { - return uhttp->public.method; -} - -const char* UhttpGetTarget(const struct Uhttp* uhttp) { - return uhttp->public.target; -} - -const char* UhttpGetBody(const struct Uhttp* uhttp) { - return uhttp->public.body; -} - -size_t UhttpGetBodySize(const struct Uhttp* uhttp) { - return uhttp->public.body_size; -} diff --git a/uhttp.h b/uhttp.h deleted file mode 100644 index 2cbb270..0000000 --- a/uhttp.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright (C) 2021 Mikhail Burakov. This file is part of MQhTTp. - * - * MQhTTp is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * MQhTTp is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with MQhTTp. If not, see . - */ - -#ifndef MQHTTP_UHTTP_H_ -#define MQHTTP_UHTTP_H_ - -#include - -#ifdef __cplusplus -extern "C" { -#endif // __cplusplus - -enum UhttpResult { - kUhttpResultTerminate, - kUhttpResultFailure, - kUhttpResultWantMore, - kUhttpResultFinished -}; - -struct Uhttp* UhttpCreate(void); -void UhttpDestroy(struct Uhttp* uhttp); - -void UhttpReset(struct Uhttp* uhttp); -void* UhttpAllocate(struct Uhttp* uhttp, size_t size); -enum UhttpResult UhttpConsume(struct Uhttp* uhttp, size_t size); - -const char* UhttpGetMethod(const struct Uhttp* uhttp); -const char* UhttpGetTarget(const struct Uhttp* uhttp); -const char* UhttpGetBody(const struct Uhttp* uhttp); -size_t UhttpGetBodySize(const struct Uhttp* uhttp); - -#ifdef __cplusplus -} // extern "C" -#endif // __cplusplus - -#endif // MQHTTP_UHTTP_H_ -- cgit v1.2.3