diff options
| author | Mikhail Burakov <mburakov@mailbox.org> | 2021-10-30 16:21:29 +0200 |
|---|---|---|
| committer | Mikhail Burakov <mburakov@mailbox.org> | 2021-10-30 16:21:29 +0200 |
| commit | e9e08557c07eeb98b3c1b07042ee54d0de362974 (patch) | |
| tree | 52c40a25c630d9dcdb25fc905eba63fa4dc58fe0 | |
| parent | 1cdd19dd498a36128fafc01c251ec85d1cd2dafe (diff) | |
Finalize subscribe and publish lua commands
| -rw-r--r-- | main.c | 92 |
1 files changed, 60 insertions, 32 deletions
@@ -38,7 +38,7 @@ struct Message { char* topic; void* payload; size_t payload_size; - void* handler; + int handler; }; struct Context { @@ -47,6 +47,7 @@ struct Context { char* buffer; size_t size; size_t alloc; + lua_State* lua_state; }; static volatile sig_atomic_t g_shutdown; @@ -210,32 +211,32 @@ rollback_tsearch: return NULL; } -static void StoreMessagePayload(void** messages, const char* topic, - const void* payload, size_t payload_size) { - struct Message* message = GetMessage(messages, topic); +static void HandleMosquitto(struct mosquitto* mosq, void* user, + const struct mosquitto_message* mosq_msg) { + (void)mosq; + struct Context* context = user; + struct Message* message = GetMessage(&context->messages, mosq_msg->topic); if (!message) { Log("Failed to get message"); return; } + size_t payload_size = (size_t)mosq_msg->payloadlen; void* buffer = malloc(payload_size); if (!buffer) { Log("Failed to copy payload (%s)", strerror(errno)); return; } - memcpy(buffer, payload, payload_size); + memcpy(buffer, mosq_msg->payload, payload_size); free(message->payload); message->payload = buffer; message->payload_size = payload_size; - return; -} - -static void HandleMosquitto(struct mosquitto* mosq, void* user, - const struct mosquitto_message* message) { - (void)mosq; - struct Context* context = user; - // TODO(mburakov): Call registered lua callback if any. - StoreMessagePayload(&context->messages, message->topic, message->payload, - (size_t)message->payloadlen); + if (message->handler) { + // TODO(mburakov): Handle lua errors. + lua_rawgeti(context->lua_state, LUA_REGISTRYINDEX, message->handler); + lua_pushlstring(context->lua_state, message->payload, + message->payload_size); + lua_pcall(context->lua_state, 1, 0, 0); + } } static void SourceCurrentDir(lua_State* lua_state) { @@ -258,19 +259,30 @@ static void SourceCurrentDir(lua_State* lua_state) { static int LuaSubscribe(lua_State* lua_state) { // TODO(mburakov): Handle lua errors. struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1)); + const char* topic = lua_tolstring(lua_state, -2, NULL); + struct Message* message = GetMessage(&context->messages, topic); + if (!message) { + Log("Failed to get message"); + return 0; + } + message->handler = luaL_ref(lua_state, LUA_REGISTRYINDEX); return 0; } static int LuaPublish(lua_State* lua_state) { // TODO(mburakov): Handle lua errors. struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1)); - size_t payload_size; - const char* topic = lua_tolstring(lua_state, -2, NULL); + char* buffer = context->buffer; + size_t topic_size, payload_size; + const char* topic = lua_tolstring(lua_state, -2, &topic_size); const char* payload = lua_tolstring(lua_state, -1, &payload_size); - int mosq_errno = mosquitto_publish(context->mosq, NULL, topic, - (int)payload_size, payload, 0, false); - if (mosq_errno != MOSQ_ERR_SUCCESS) - Log("Failed to publish to mosquitto (%s)", mosquitto_strerror(mosq_errno)); + // mburakov: Not allowed to publish from mosquitto callback. + if (!BufferAppend(context, topic, topic_size + 1) || + !BufferAppend(context, payload, payload_size + 1)) { + Log("Failed to schedule mosquitto publish"); + context->buffer = buffer; + return 0; + } return 0; } @@ -321,20 +333,20 @@ int main(int argc, char* argv[]) { Log("Failed to add mosquitto to epoll (%s)", strerror(errno)); goto rollback_mosquitto_connect; } - lua_State* lua_state = luaL_newstate(); - if (!lua_state) { + context.lua_state = luaL_newstate(); + if (!context.lua_state) { Log("Failed to create lua state"); goto rollback_mosquitto_connect; } - luaL_openlibs(lua_state); + luaL_openlibs(context.lua_state); // TODO(mburakov): Handle lua errors. - lua_pushlightuserdata(lua_state, &context); - lua_pushcclosure(lua_state, LuaSubscribe, 1); - lua_setglobal(lua_state, "subscribe"); - lua_pushlightuserdata(lua_state, &context); - lua_pushcclosure(lua_state, LuaPublish, 1); - lua_setglobal(lua_state, "publish"); - SourceCurrentDir(lua_state); + lua_pushlightuserdata(context.lua_state, &context); + lua_pushcclosure(context.lua_state, LuaSubscribe, 1); + lua_setglobal(context.lua_state, "subscribe"); + lua_pushlightuserdata(context.lua_state, &context); + lua_pushcclosure(context.lua_state, LuaPublish, 1); + lua_setglobal(context.lua_state, "publish"); + SourceCurrentDir(context.lua_state); static const struct sigaction sa = {.sa_handler = OnSignal}; if (sigaction(SIGINT, &sa, NULL) || sigaction(SIGTERM, &sa, NULL)) { Log("Failed to set up signal handlers (%s)", strerror(errno)); @@ -357,11 +369,27 @@ int main(int argc, char* argv[]) { break; } if (ev.data.fd == mosq_sock) { + context.size = 0; mosq_errno = mosquitto_loop_read(context.mosq, 1); if (mosq_errno != MOSQ_ERR_SUCCESS) { Log("Failed to read mosquitto (%s)", mosquitto_strerror(mosq_errno)); goto rollback_lual_newstate; } + // mburakov: Execute scheduled publishes if any. + for (const char* ptr = context.buffer; + ptr < context.buffer + context.size;) { + const char* topic = ptr; + ptr += strlen(ptr) + 1; + const char* payload = ptr; + size_t payload_size = strlen(payload); + ptr += payload_size + 1; + mosq_errno = mosquitto_publish(context.mosq, NULL, topic, + (int)payload_size, payload, 0, false); + if (mosq_errno != MOSQ_ERR_SUCCESS) { + Log("Failed to publish mosquitto (%s)", + mosquitto_strerror(mosq_errno)); + } + } continue; } if (!ServerMaybeHandle(server, ev.data.fd)) @@ -369,7 +397,7 @@ int main(int argc, char* argv[]) { } result = EXIT_SUCCESS; rollback_lual_newstate: - lua_close(lua_state); + lua_close(context.lua_state); rollback_mosquitto_connect: mosquitto_disconnect(context.mosq); rollback_mosquitto_new: |
