From 5eb80b12008f403b26e9365dbcee911969662e35 Mon Sep 17 00:00:00 2001 From: Mikhail Burakov Date: Sat, 28 Jan 2023 14:56:06 +0100 Subject: Implement keepalive and pings --- main.c | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'main.c') diff --git a/main.c b/main.c index dac79fb..5fbfa89 100644 --- a/main.c +++ b/main.c @@ -612,8 +612,9 @@ int main(int argc, char* argv[]) { LOGE("Failed to connect mqtt socket (%s)", strerror(errno)); goto close_mqtt; } - // TODO(mburakov): Implement keepalive - if (!MqttConnect(g_service.mqtt, 65535)) { + uint64_t now = MillisNow(); + static const uint16_t mqtt_keepalive = UINT16_MAX; + if (!MqttConnect(g_service.mqtt, mqtt_keepalive)) { LOGE("Failed to connect mqtt (%s)", strerror(errno)); goto close_mqtt; } @@ -633,10 +634,25 @@ int main(int argc, char* argv[]) { BufferCreate(&g_service.mqtt_buffer); while (!g_signal) { - enum IoMuxerResult result = IoMuxerIterate(&g_service.io_muxer, -1); - if (result == kIoMuxerResultError && errno != EINTR) { - LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); - OnSignal(SIGABRT); + uint64_t timeout = now + mqtt_keepalive * 1000ull - MillisNow(); + switch (IoMuxerIterate(&g_service.io_muxer, (int)timeout)) { + case kIoMuxerResultSuccess: + break; + case kIoMuxerResultTimeout: + now = MillisNow(); + if (!MqttPing(g_service.mqtt)) { + LOGE("Failed to ping mqtt broker (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + case kIoMuxerResultError: + if (errno != EINTR) { + LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + default: + __builtin_unreachable(); } } twalk(g_service.messages, UnrefLuaCallbacks); -- cgit v1.2.3