From 5eb80b12008f403b26e9365dbcee911969662e35 Mon Sep 17 00:00:00 2001 From: Mikhail Burakov Date: Sat, 28 Jan 2023 14:56:06 +0100 Subject: Implement keepalive and pings --- main.c | 28 ++++++++++++++++++++++------ toolbox | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/main.c b/main.c index dac79fb..5fbfa89 100644 --- a/main.c +++ b/main.c @@ -612,8 +612,9 @@ int main(int argc, char* argv[]) { LOGE("Failed to connect mqtt socket (%s)", strerror(errno)); goto close_mqtt; } - // TODO(mburakov): Implement keepalive - if (!MqttConnect(g_service.mqtt, 65535)) { + uint64_t now = MillisNow(); + static const uint16_t mqtt_keepalive = UINT16_MAX; + if (!MqttConnect(g_service.mqtt, mqtt_keepalive)) { LOGE("Failed to connect mqtt (%s)", strerror(errno)); goto close_mqtt; } @@ -633,10 +634,25 @@ int main(int argc, char* argv[]) { BufferCreate(&g_service.mqtt_buffer); while (!g_signal) { - enum IoMuxerResult result = IoMuxerIterate(&g_service.io_muxer, -1); - if (result == kIoMuxerResultError && errno != EINTR) { - LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); - OnSignal(SIGABRT); + uint64_t timeout = now + mqtt_keepalive * 1000ull - MillisNow(); + switch (IoMuxerIterate(&g_service.io_muxer, (int)timeout)) { + case kIoMuxerResultSuccess: + break; + case kIoMuxerResultTimeout: + now = MillisNow(); + if (!MqttPing(g_service.mqtt)) { + LOGE("Failed to ping mqtt broker (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + case kIoMuxerResultError: + if (errno != EINTR) { + LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + default: + __builtin_unreachable(); } } twalk(g_service.messages, UnrefLuaCallbacks); diff --git a/toolbox b/toolbox index 1b661ca..c9bc1f9 160000 --- a/toolbox +++ b/toolbox @@ -1 +1 @@ -Subproject commit 1b661ca51f4bd42c9aeba8dbedff691d69623263 +Subproject commit c9bc1f972e1e072a0498be8e4e7d258ac9f6c7db -- cgit v1.2.3