diff options
| author | Mikhail Burakov <mburakov@mailbox.org> | 2023-01-28 14:56:06 +0100 |
|---|---|---|
| committer | Mikhail Burakov <mburakov@mailbox.org> | 2023-01-28 14:56:06 +0100 |
| commit | 5eb80b12008f403b26e9365dbcee911969662e35 (patch) | |
| tree | cfa83e707bcb5e998b5c3a1e273c24a195c03269 /main.c | |
| parent | 86b2ccc6577579f7abbc22d73721858f15e51ac3 (diff) | |
Implement keepalive and pings
Diffstat (limited to 'main.c')
| -rw-r--r-- | main.c | 28 |
1 files changed, 22 insertions, 6 deletions
@@ -612,8 +612,9 @@ int main(int argc, char* argv[]) { LOGE("Failed to connect mqtt socket (%s)", strerror(errno)); goto close_mqtt; } - // TODO(mburakov): Implement keepalive - if (!MqttConnect(g_service.mqtt, 65535)) { + uint64_t now = MillisNow(); + static const uint16_t mqtt_keepalive = UINT16_MAX; + if (!MqttConnect(g_service.mqtt, mqtt_keepalive)) { LOGE("Failed to connect mqtt (%s)", strerror(errno)); goto close_mqtt; } @@ -633,10 +634,25 @@ int main(int argc, char* argv[]) { BufferCreate(&g_service.mqtt_buffer); while (!g_signal) { - enum IoMuxerResult result = IoMuxerIterate(&g_service.io_muxer, -1); - if (result == kIoMuxerResultError && errno != EINTR) { - LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); - OnSignal(SIGABRT); + uint64_t timeout = now + mqtt_keepalive * 1000ull - MillisNow(); + switch (IoMuxerIterate(&g_service.io_muxer, (int)timeout)) { + case kIoMuxerResultSuccess: + break; + case kIoMuxerResultTimeout: + now = MillisNow(); + if (!MqttPing(g_service.mqtt)) { + LOGE("Failed to ping mqtt broker (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + case kIoMuxerResultError: + if (errno != EINTR) { + LOGE("Failed to iterate iomuxer (%s)", strerror(errno)); + OnSignal(SIGABRT); + } + break; + default: + __builtin_unreachable(); } } twalk(g_service.messages, UnrefLuaCallbacks); |
