summaryrefslogtreecommitdiff
path: root/main.c
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2023-01-07 11:18:13 +0100
committerMikhail Burakov <mburakov@mailbox.org>2023-01-07 11:18:13 +0100
commitd62c48945382a440a5e91876a58a1d91e3da5b1d (patch)
tree03fce6f1c644dc20e6ce83321b1d1d8020f529b3 /main.c
parentfe36fcc6fe66f43ab95485239f45170fff94576f (diff)
Add initial lua integration
Diffstat (limited to 'main.c')
-rw-r--r--main.c215
1 files changed, 72 insertions, 143 deletions
diff --git a/main.c b/main.c
index 69eefa8..c4fc5c0 100644
--- a/main.c
+++ b/main.c
@@ -15,12 +15,12 @@
* along with MQhTTp. If not, see <https://www.gnu.org/licenses/>.
*/
-//#include <dirent.h>
-#include <errno.h>
-//#include <lauxlib.h>
-//#include <lualib.h>
-//#include <search.h>
#include <arpa/inet.h>
+#include <dirent.h>
+#include <errno.h>
+#include <lauxlib.h>
+#include <lua.h>
+#include <lualib.h>
#include <netinet/in.h>
#include <search.h>
#include <signal.h>
@@ -56,6 +56,7 @@ struct ServiceContext {
int mqtt;
struct IoMuxer io_muxer;
struct Buffer mqtt_buffer;
+ lua_State* lua_state;
struct ClientContext* clients;
size_t messages_count;
void* messages;
@@ -237,6 +238,7 @@ static bool ServeHttpPost(int fd, const char* target, const void* content,
LOGW("Topic is too long to fit into mqtt message");
return SendHttpReply(fd, "414 URI Too Long", NULL, NULL, 0);
}
+ LOGD("%s -> %.*s", topic, (int)content_length, (const char*)content);
if (!MqttPublish(g_service.mqtt, topic, (uint16_t)topic_size, content,
content_length)) {
LOGW("Failed to publish mqtt message (%s)", strerror(errno));
@@ -364,6 +366,38 @@ static void OnMqttConnectAck(void* user, bool success) {
}
}
+static int LuaPublish(lua_State* lua_state) {
+ int result = 0;
+ size_t topic_length;
+ const char* topic = luaL_checklstring(lua_state, 1, &topic_length);
+ if (!topic || topic_length > UINT16_MAX) {
+ LOGW("Invalid topic argument for publish call");
+ goto bail_out;
+ }
+ size_t payload_length;
+ const char* payload = luaL_checklstring(lua_state, 2, &payload_length);
+ if (!payload) {
+ LOGW("Invalid payload argument for publish call");
+ goto bail_out;
+ }
+ LOGD("%.*s -> %.*s", (int)topic_length, topic, (int)payload_length, payload);
+ if (!MqttPublish(g_service.mqtt, topic, (uint16_t)topic_length, payload,
+ payload_length)) {
+ LOGW("Failed to publish mqtt message (%s)", strerror(errno));
+ goto bail_out;
+ }
+ result = 1;
+
+bail_out:
+ lua_pushboolean(lua_state, result);
+ return 1;
+}
+
+static int LuaSubscribe(lua_State* lua_state) {
+ // TODO(mburakov): Implement me!
+ return 0;
+}
+
static void OnMqttSubscribeAck(void* user, bool success) {
(void)user;
if (!success) {
@@ -371,6 +405,38 @@ static void OnMqttSubscribeAck(void* user, bool success) {
OnSignal(SIGTERM);
return;
}
+
+ g_service.lua_state = luaL_newstate();
+ if (!g_service.lua_state) {
+ LOGW("Failed to allocate lua state (%s)", strerror(errno));
+ OnSignal(SIGTERM);
+ return;
+ }
+
+ luaL_openlibs(g_service.lua_state);
+ lua_pushcfunction(g_service.lua_state, LuaPublish);
+ lua_setglobal(g_service.lua_state, "publish");
+ lua_pushcfunction(g_service.lua_state, LuaSubscribe);
+ lua_setglobal(g_service.lua_state, "subscribe");
+
+ DIR* current_dir = opendir(".");
+ if (!current_dir) {
+ LOGW("Failed to open current dir (%s)", strerror(errno));
+ OnSignal(SIGTERM);
+ return;
+ }
+ for (struct dirent* item; (item = readdir(current_dir));) {
+ if (item->d_type != DT_REG) continue;
+ size_t length = strlen(item->d_name);
+ static const char lua_ext[] = {'.', 'l', 'u', 'a'};
+ if (length < sizeof(lua_ext)) continue;
+ const char* ext = item->d_name + length - sizeof(lua_ext);
+ if (memcmp(ext, lua_ext, sizeof(lua_ext))) continue;
+ LOGI("Sourcing %s...", item->d_name);
+ if (luaL_dofile(g_service.lua_state, item->d_name))
+ LOGW("%s", lua_tostring(g_service.lua_state, -1));
+ }
+ closedir(current_dir);
}
static void OnMqttPublish(void* user, const char* topic, size_t topic_size,
@@ -527,6 +593,7 @@ int main(int argc, char* argv[]) {
}
tdestroy(g_service.messages, MessageDestroy);
while (g_service.clients) DropClientContext(g_service.clients);
+ if (g_service.lua_state) lua_close(g_service.lua_state);
BufferDestroy(&g_service.mqtt_buffer);
destroy_iomuxer:
@@ -539,142 +606,4 @@ close_http:
close(g_service.http);
bool success = g_signal == SIGINT || g_signal == SIGTERM;
return success ? EXIT_SUCCESS : EXIT_FAILURE;
-
-#if 0
- if (argc < 3) Terminate("Usage: %s <host> <port>", argv[0]);
- int port = atoi(argv[2]);
- if (0 >= port || port >= 65536)
- Terminate("Invalid mosquitto port \"%s\"", argv[2]);
- int epfd = epoll_create(1);
- if (epfd == -1) Terminate("Failed to create epoll (%s)", strerror(errno));
- int result = EXIT_FAILURE;
- struct Context context;
- memset(&context, 0, sizeof(struct Context));
- struct Server* server = ServerCreate(epfd, HandleRequest, &context);
- if (!server) {
- Log("Failed to create server");
- goto rollback_epoll_create;
- }
- int mosq_errno = mosquitto_lib_init();
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to initialize mosquitto lib (%s)",
- mosquitto_strerror(mosq_errno));
- goto rollback_server_create;
- }
- context.mosq = mosquitto_new(NULL, true, &context);
- if (!context.mosq) {
- Log("Failed to create mosquitto (%s)", strerror(errno));
- goto rollback_mosquitto_lib_init;
- }
- mosquitto_message_callback_set(context.mosq, HandleMosquitto);
- mosq_errno = mosquitto_connect(context.mosq, argv[1], port, 60);
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to connect mosquitto (%s)", mosquitto_strerror(mosq_errno));
- goto rollback_mosquitto_new;
- }
- mosq_errno = mosquitto_subscribe(context.mosq, NULL, "+/#", 0);
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to subscribe mosquitto (%s)", mosquitto_strerror(mosq_errno));
- goto rollback_mosquitto_connect;
- }
- int mosq_sock = mosquitto_socket(context.mosq);
- if (mosq_sock == -1) {
- Log("Failed to get mosquitto socket");
- goto rollback_mosquitto_connect;
- }
- struct epoll_event ev = {.events = EPOLLIN, .data.fd = mosq_sock};
- if (epoll_ctl(epfd, EPOLL_CTL_ADD, mosq_sock, &ev)) {
- Log("Failed to add mosquitto to epoll (%s)", strerror(errno));
- goto rollback_mosquitto_connect;
- }
- context.lua_state = luaL_newstate();
- if (!context.lua_state) {
- Log("Failed to create lua state");
- goto rollback_mosquitto_connect;
- }
- luaL_openlibs(context.lua_state);
- // TODO(mburakov): Handle lua errors.
-#if 0
- // mburakov: Userdata is broken on AArch64
- lua_pushlightuserdata(context.lua_state, &context);
-#else
- void* ctx = &context;
- lua_pushlstring(context.lua_state, (const char*)&ctx, sizeof(void*));
-#endif
- lua_pushcclosure(context.lua_state, LuaSubscribe, 1);
- lua_setglobal(context.lua_state, "subscribe");
-#if 0
- // mburakov: Userdata is broken on AArch64
- lua_pushlightuserdata(context.lua_state, &context);
-#else
- lua_pushlstring(context.lua_state, (const char*)&ctx, sizeof(void*));
-#endif
- lua_pushcclosure(context.lua_state, LuaPublish, 1);
- lua_setglobal(context.lua_state, "publish");
- SourceCurrentDir(context.lua_state);
- static const struct sigaction sa = {.sa_handler = OnSignal};
- if (sigaction(SIGINT, &sa, NULL) || sigaction(SIGTERM, &sa, NULL)) {
- Log("Failed to set up signal handlers (%s)", strerror(errno));
- goto rollback_lual_newstate;
- }
- while (!g_shutdown) {
- switch (epoll_wait(epfd, &ev, 1, 1000)) {
- case -1:
- Log("Failed to wait epoll (%s)", strerror(errno));
- if (errno != EINTR) goto rollback_lual_newstate;
- continue;
- case 0:
- mosq_errno = mosquitto_loop_misc(context.mosq);
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to loop mosquitto (%s)", mosquitto_strerror(mosq_errno));
- goto rollback_lual_newstate;
- }
- continue;
- default:
- break;
- }
- if (ev.data.fd == mosq_sock) {
- context.size = 0;
- mosq_errno = mosquitto_loop_read(context.mosq, 1);
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to read mosquitto (%s)", mosquitto_strerror(mosq_errno));
- goto rollback_lual_newstate;
- }
- // mburakov: Execute scheduled publishes if any.
- for (const char* ptr = context.buffer;
- ptr < context.buffer + context.size;) {
- const char* topic = ptr;
- ptr += strlen(ptr) + 1;
- const char* payload = ptr;
- size_t payload_size = strlen(payload);
- ptr += payload_size + 1;
- mosq_errno = mosquitto_publish(context.mosq, NULL, topic,
- (int)payload_size, payload, 0, false);
- if (mosq_errno != MOSQ_ERR_SUCCESS) {
- Log("Failed to publish mosquitto (%s)",
- mosquitto_strerror(mosq_errno));
- }
- }
- continue;
- }
- if (!ServerMaybeHandle(server, ev.data.fd))
- Terminate("Stray socket in epoll");
- }
- result = EXIT_SUCCESS;
-rollback_lual_newstate:
- lua_close(context.lua_state);
-rollback_mosquitto_connect:
- mosquitto_disconnect(context.mosq);
-rollback_mosquitto_new:
- mosquitto_destroy(context.mosq);
-rollback_mosquitto_lib_init:
- mosquitto_lib_cleanup();
-rollback_server_create:
- ServerDestroy(server);
-rollback_epoll_create:
- tdestroy(context.messages, FreeMessage);
- free(context.buffer);
- close(epfd);
- return result;
-#endif
}