From d62c48945382a440a5e91876a58a1d91e3da5b1d Mon Sep 17 00:00:00 2001 From: Mikhail Burakov Date: Sat, 7 Jan 2023 11:18:13 +0100 Subject: Add initial lua integration --- main.c | 215 ++++++++++++++++++++++------------------------------------------- 1 file changed, 72 insertions(+), 143 deletions(-) (limited to 'main.c') diff --git a/main.c b/main.c index 69eefa8..c4fc5c0 100644 --- a/main.c +++ b/main.c @@ -15,12 +15,12 @@ * along with MQhTTp. If not, see . */ -//#include -#include -//#include -//#include -//#include #include +#include +#include +#include +#include +#include #include #include #include @@ -56,6 +56,7 @@ struct ServiceContext { int mqtt; struct IoMuxer io_muxer; struct Buffer mqtt_buffer; + lua_State* lua_state; struct ClientContext* clients; size_t messages_count; void* messages; @@ -237,6 +238,7 @@ static bool ServeHttpPost(int fd, const char* target, const void* content, LOGW("Topic is too long to fit into mqtt message"); return SendHttpReply(fd, "414 URI Too Long", NULL, NULL, 0); } + LOGD("%s -> %.*s", topic, (int)content_length, (const char*)content); if (!MqttPublish(g_service.mqtt, topic, (uint16_t)topic_size, content, content_length)) { LOGW("Failed to publish mqtt message (%s)", strerror(errno)); @@ -364,6 +366,38 @@ static void OnMqttConnectAck(void* user, bool success) { } } +static int LuaPublish(lua_State* lua_state) { + int result = 0; + size_t topic_length; + const char* topic = luaL_checklstring(lua_state, 1, &topic_length); + if (!topic || topic_length > UINT16_MAX) { + LOGW("Invalid topic argument for publish call"); + goto bail_out; + } + size_t payload_length; + const char* payload = luaL_checklstring(lua_state, 2, &payload_length); + if (!payload) { + LOGW("Invalid payload argument for publish call"); + goto bail_out; + } + LOGD("%.*s -> %.*s", (int)topic_length, topic, (int)payload_length, payload); + if (!MqttPublish(g_service.mqtt, topic, (uint16_t)topic_length, payload, + payload_length)) { + LOGW("Failed to publish mqtt message (%s)", strerror(errno)); + goto bail_out; + } + result = 1; + +bail_out: + lua_pushboolean(lua_state, result); + return 1; +} + +static int LuaSubscribe(lua_State* lua_state) { + // TODO(mburakov): Implement me! + return 0; +} + static void OnMqttSubscribeAck(void* user, bool success) { (void)user; if (!success) { @@ -371,6 +405,38 @@ static void OnMqttSubscribeAck(void* user, bool success) { OnSignal(SIGTERM); return; } + + g_service.lua_state = luaL_newstate(); + if (!g_service.lua_state) { + LOGW("Failed to allocate lua state (%s)", strerror(errno)); + OnSignal(SIGTERM); + return; + } + + luaL_openlibs(g_service.lua_state); + lua_pushcfunction(g_service.lua_state, LuaPublish); + lua_setglobal(g_service.lua_state, "publish"); + lua_pushcfunction(g_service.lua_state, LuaSubscribe); + lua_setglobal(g_service.lua_state, "subscribe"); + + DIR* current_dir = opendir("."); + if (!current_dir) { + LOGW("Failed to open current dir (%s)", strerror(errno)); + OnSignal(SIGTERM); + return; + } + for (struct dirent* item; (item = readdir(current_dir));) { + if (item->d_type != DT_REG) continue; + size_t length = strlen(item->d_name); + static const char lua_ext[] = {'.', 'l', 'u', 'a'}; + if (length < sizeof(lua_ext)) continue; + const char* ext = item->d_name + length - sizeof(lua_ext); + if (memcmp(ext, lua_ext, sizeof(lua_ext))) continue; + LOGI("Sourcing %s...", item->d_name); + if (luaL_dofile(g_service.lua_state, item->d_name)) + LOGW("%s", lua_tostring(g_service.lua_state, -1)); + } + closedir(current_dir); } static void OnMqttPublish(void* user, const char* topic, size_t topic_size, @@ -527,6 +593,7 @@ int main(int argc, char* argv[]) { } tdestroy(g_service.messages, MessageDestroy); while (g_service.clients) DropClientContext(g_service.clients); + if (g_service.lua_state) lua_close(g_service.lua_state); BufferDestroy(&g_service.mqtt_buffer); destroy_iomuxer: @@ -539,142 +606,4 @@ close_http: close(g_service.http); bool success = g_signal == SIGINT || g_signal == SIGTERM; return success ? EXIT_SUCCESS : EXIT_FAILURE; - -#if 0 - if (argc < 3) Terminate("Usage: %s ", argv[0]); - int port = atoi(argv[2]); - if (0 >= port || port >= 65536) - Terminate("Invalid mosquitto port \"%s\"", argv[2]); - int epfd = epoll_create(1); - if (epfd == -1) Terminate("Failed to create epoll (%s)", strerror(errno)); - int result = EXIT_FAILURE; - struct Context context; - memset(&context, 0, sizeof(struct Context)); - struct Server* server = ServerCreate(epfd, HandleRequest, &context); - if (!server) { - Log("Failed to create server"); - goto rollback_epoll_create; - } - int mosq_errno = mosquitto_lib_init(); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to initialize mosquitto lib (%s)", - mosquitto_strerror(mosq_errno)); - goto rollback_server_create; - } - context.mosq = mosquitto_new(NULL, true, &context); - if (!context.mosq) { - Log("Failed to create mosquitto (%s)", strerror(errno)); - goto rollback_mosquitto_lib_init; - } - mosquitto_message_callback_set(context.mosq, HandleMosquitto); - mosq_errno = mosquitto_connect(context.mosq, argv[1], port, 60); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to connect mosquitto (%s)", mosquitto_strerror(mosq_errno)); - goto rollback_mosquitto_new; - } - mosq_errno = mosquitto_subscribe(context.mosq, NULL, "+/#", 0); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to subscribe mosquitto (%s)", mosquitto_strerror(mosq_errno)); - goto rollback_mosquitto_connect; - } - int mosq_sock = mosquitto_socket(context.mosq); - if (mosq_sock == -1) { - Log("Failed to get mosquitto socket"); - goto rollback_mosquitto_connect; - } - struct epoll_event ev = {.events = EPOLLIN, .data.fd = mosq_sock}; - if (epoll_ctl(epfd, EPOLL_CTL_ADD, mosq_sock, &ev)) { - Log("Failed to add mosquitto to epoll (%s)", strerror(errno)); - goto rollback_mosquitto_connect; - } - context.lua_state = luaL_newstate(); - if (!context.lua_state) { - Log("Failed to create lua state"); - goto rollback_mosquitto_connect; - } - luaL_openlibs(context.lua_state); - // TODO(mburakov): Handle lua errors. -#if 0 - // mburakov: Userdata is broken on AArch64 - lua_pushlightuserdata(context.lua_state, &context); -#else - void* ctx = &context; - lua_pushlstring(context.lua_state, (const char*)&ctx, sizeof(void*)); -#endif - lua_pushcclosure(context.lua_state, LuaSubscribe, 1); - lua_setglobal(context.lua_state, "subscribe"); -#if 0 - // mburakov: Userdata is broken on AArch64 - lua_pushlightuserdata(context.lua_state, &context); -#else - lua_pushlstring(context.lua_state, (const char*)&ctx, sizeof(void*)); -#endif - lua_pushcclosure(context.lua_state, LuaPublish, 1); - lua_setglobal(context.lua_state, "publish"); - SourceCurrentDir(context.lua_state); - static const struct sigaction sa = {.sa_handler = OnSignal}; - if (sigaction(SIGINT, &sa, NULL) || sigaction(SIGTERM, &sa, NULL)) { - Log("Failed to set up signal handlers (%s)", strerror(errno)); - goto rollback_lual_newstate; - } - while (!g_shutdown) { - switch (epoll_wait(epfd, &ev, 1, 1000)) { - case -1: - Log("Failed to wait epoll (%s)", strerror(errno)); - if (errno != EINTR) goto rollback_lual_newstate; - continue; - case 0: - mosq_errno = mosquitto_loop_misc(context.mosq); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to loop mosquitto (%s)", mosquitto_strerror(mosq_errno)); - goto rollback_lual_newstate; - } - continue; - default: - break; - } - if (ev.data.fd == mosq_sock) { - context.size = 0; - mosq_errno = mosquitto_loop_read(context.mosq, 1); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to read mosquitto (%s)", mosquitto_strerror(mosq_errno)); - goto rollback_lual_newstate; - } - // mburakov: Execute scheduled publishes if any. - for (const char* ptr = context.buffer; - ptr < context.buffer + context.size;) { - const char* topic = ptr; - ptr += strlen(ptr) + 1; - const char* payload = ptr; - size_t payload_size = strlen(payload); - ptr += payload_size + 1; - mosq_errno = mosquitto_publish(context.mosq, NULL, topic, - (int)payload_size, payload, 0, false); - if (mosq_errno != MOSQ_ERR_SUCCESS) { - Log("Failed to publish mosquitto (%s)", - mosquitto_strerror(mosq_errno)); - } - } - continue; - } - if (!ServerMaybeHandle(server, ev.data.fd)) - Terminate("Stray socket in epoll"); - } - result = EXIT_SUCCESS; -rollback_lual_newstate: - lua_close(context.lua_state); -rollback_mosquitto_connect: - mosquitto_disconnect(context.mosq); -rollback_mosquitto_new: - mosquitto_destroy(context.mosq); -rollback_mosquitto_lib_init: - mosquitto_lib_cleanup(); -rollback_server_create: - ServerDestroy(server); -rollback_epoll_create: - tdestroy(context.messages, FreeMessage); - free(context.buffer); - close(epfd); - return result; -#endif } -- cgit v1.2.3