summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2021-10-30 16:21:29 +0200
committerMikhail Burakov <mburakov@mailbox.org>2021-10-30 16:21:29 +0200
commite9e08557c07eeb98b3c1b07042ee54d0de362974 (patch)
tree52c40a25c630d9dcdb25fc905eba63fa4dc58fe0
parent1cdd19dd498a36128fafc01c251ec85d1cd2dafe (diff)
Finalize subscribe and publish lua commands
-rw-r--r--main.c92
1 files changed, 60 insertions, 32 deletions
diff --git a/main.c b/main.c
index d59122a..5ac1678 100644
--- a/main.c
+++ b/main.c
@@ -38,7 +38,7 @@ struct Message {
char* topic;
void* payload;
size_t payload_size;
- void* handler;
+ int handler;
};
struct Context {
@@ -47,6 +47,7 @@ struct Context {
char* buffer;
size_t size;
size_t alloc;
+ lua_State* lua_state;
};
static volatile sig_atomic_t g_shutdown;
@@ -210,32 +211,32 @@ rollback_tsearch:
return NULL;
}
-static void StoreMessagePayload(void** messages, const char* topic,
- const void* payload, size_t payload_size) {
- struct Message* message = GetMessage(messages, topic);
+static void HandleMosquitto(struct mosquitto* mosq, void* user,
+ const struct mosquitto_message* mosq_msg) {
+ (void)mosq;
+ struct Context* context = user;
+ struct Message* message = GetMessage(&context->messages, mosq_msg->topic);
if (!message) {
Log("Failed to get message");
return;
}
+ size_t payload_size = (size_t)mosq_msg->payloadlen;
void* buffer = malloc(payload_size);
if (!buffer) {
Log("Failed to copy payload (%s)", strerror(errno));
return;
}
- memcpy(buffer, payload, payload_size);
+ memcpy(buffer, mosq_msg->payload, payload_size);
free(message->payload);
message->payload = buffer;
message->payload_size = payload_size;
- return;
-}
-
-static void HandleMosquitto(struct mosquitto* mosq, void* user,
- const struct mosquitto_message* message) {
- (void)mosq;
- struct Context* context = user;
- // TODO(mburakov): Call registered lua callback if any.
- StoreMessagePayload(&context->messages, message->topic, message->payload,
- (size_t)message->payloadlen);
+ if (message->handler) {
+ // TODO(mburakov): Handle lua errors.
+ lua_rawgeti(context->lua_state, LUA_REGISTRYINDEX, message->handler);
+ lua_pushlstring(context->lua_state, message->payload,
+ message->payload_size);
+ lua_pcall(context->lua_state, 1, 0, 0);
+ }
}
static void SourceCurrentDir(lua_State* lua_state) {
@@ -258,19 +259,30 @@ static void SourceCurrentDir(lua_State* lua_state) {
static int LuaSubscribe(lua_State* lua_state) {
// TODO(mburakov): Handle lua errors.
struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1));
+ const char* topic = lua_tolstring(lua_state, -2, NULL);
+ struct Message* message = GetMessage(&context->messages, topic);
+ if (!message) {
+ Log("Failed to get message");
+ return 0;
+ }
+ message->handler = luaL_ref(lua_state, LUA_REGISTRYINDEX);
return 0;
}
static int LuaPublish(lua_State* lua_state) {
// TODO(mburakov): Handle lua errors.
struct Context* context = lua_touserdata(lua_state, lua_upvalueindex(1));
- size_t payload_size;
- const char* topic = lua_tolstring(lua_state, -2, NULL);
+ char* buffer = context->buffer;
+ size_t topic_size, payload_size;
+ const char* topic = lua_tolstring(lua_state, -2, &topic_size);
const char* payload = lua_tolstring(lua_state, -1, &payload_size);
- int mosq_errno = mosquitto_publish(context->mosq, NULL, topic,
- (int)payload_size, payload, 0, false);
- if (mosq_errno != MOSQ_ERR_SUCCESS)
- Log("Failed to publish to mosquitto (%s)", mosquitto_strerror(mosq_errno));
+ // mburakov: Not allowed to publish from mosquitto callback.
+ if (!BufferAppend(context, topic, topic_size + 1) ||
+ !BufferAppend(context, payload, payload_size + 1)) {
+ Log("Failed to schedule mosquitto publish");
+ context->buffer = buffer;
+ return 0;
+ }
return 0;
}
@@ -321,20 +333,20 @@ int main(int argc, char* argv[]) {
Log("Failed to add mosquitto to epoll (%s)", strerror(errno));
goto rollback_mosquitto_connect;
}
- lua_State* lua_state = luaL_newstate();
- if (!lua_state) {
+ context.lua_state = luaL_newstate();
+ if (!context.lua_state) {
Log("Failed to create lua state");
goto rollback_mosquitto_connect;
}
- luaL_openlibs(lua_state);
+ luaL_openlibs(context.lua_state);
// TODO(mburakov): Handle lua errors.
- lua_pushlightuserdata(lua_state, &context);
- lua_pushcclosure(lua_state, LuaSubscribe, 1);
- lua_setglobal(lua_state, "subscribe");
- lua_pushlightuserdata(lua_state, &context);
- lua_pushcclosure(lua_state, LuaPublish, 1);
- lua_setglobal(lua_state, "publish");
- SourceCurrentDir(lua_state);
+ lua_pushlightuserdata(context.lua_state, &context);
+ lua_pushcclosure(context.lua_state, LuaSubscribe, 1);
+ lua_setglobal(context.lua_state, "subscribe");
+ lua_pushlightuserdata(context.lua_state, &context);
+ lua_pushcclosure(context.lua_state, LuaPublish, 1);
+ lua_setglobal(context.lua_state, "publish");
+ SourceCurrentDir(context.lua_state);
static const struct sigaction sa = {.sa_handler = OnSignal};
if (sigaction(SIGINT, &sa, NULL) || sigaction(SIGTERM, &sa, NULL)) {
Log("Failed to set up signal handlers (%s)", strerror(errno));
@@ -357,11 +369,27 @@ int main(int argc, char* argv[]) {
break;
}
if (ev.data.fd == mosq_sock) {
+ context.size = 0;
mosq_errno = mosquitto_loop_read(context.mosq, 1);
if (mosq_errno != MOSQ_ERR_SUCCESS) {
Log("Failed to read mosquitto (%s)", mosquitto_strerror(mosq_errno));
goto rollback_lual_newstate;
}
+ // mburakov: Execute scheduled publishes if any.
+ for (const char* ptr = context.buffer;
+ ptr < context.buffer + context.size;) {
+ const char* topic = ptr;
+ ptr += strlen(ptr) + 1;
+ const char* payload = ptr;
+ size_t payload_size = strlen(payload);
+ ptr += payload_size + 1;
+ mosq_errno = mosquitto_publish(context.mosq, NULL, topic,
+ (int)payload_size, payload, 0, false);
+ if (mosq_errno != MOSQ_ERR_SUCCESS) {
+ Log("Failed to publish mosquitto (%s)",
+ mosquitto_strerror(mosq_errno));
+ }
+ }
continue;
}
if (!ServerMaybeHandle(server, ev.data.fd))
@@ -369,7 +397,7 @@ int main(int argc, char* argv[]) {
}
result = EXIT_SUCCESS;
rollback_lual_newstate:
- lua_close(lua_state);
+ lua_close(context.lua_state);
rollback_mosquitto_connect:
mosquitto_disconnect(context.mosq);
rollback_mosquitto_new: