From f593c2fd65cab3275c98c100c6b7d775e24157f9 Mon Sep 17 00:00:00 2001
From: Mikhail Burakov <mburakov@mailbox.org>
Date: Sun, 5 May 2024 12:51:07 +0200
Subject: Initial primitive audio pipeline implementation

---
 atomic_queue.c |  81 +++++++++++++++++++++++++++++++++++
 atomic_queue.h |  40 ++++++++++++++++++
 audio.c        | 131 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 audio.h        |  31 ++++++++++++++
 main.c         |  44 +++++++++++++++++--
 makefile       |   1 +
 6 files changed, 325 insertions(+), 3 deletions(-)
 create mode 100644 atomic_queue.c
 create mode 100644 atomic_queue.h
 create mode 100644 audio.c
 create mode 100644 audio.h

diff --git a/atomic_queue.c b/atomic_queue.c
new file mode 100644
index 0000000..898db3e
--- /dev/null
+++ b/atomic_queue.c
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "atomic_queue.h"
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+static size_t min(size_t a, size_t b) { return a < b ? a : b; }
+static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }
+
+bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
+  void* buffer = malloc(alloc);
+  if (!buffer) return false;
+  *atomic_queue = (struct AtomicQueue){
+      .buffer = buffer,
+      .alloc = alloc,
+  };
+  atomic_init(&atomic_queue->size, 0);
+  return true;
+}
+
+size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
+                        size_t size) {
+  size_t capacity =
+      atomic_queue->alloc -
+      atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+  size_t tail_size = atomic_queue->alloc - atomic_queue->write;
+  size_t copy_size = min3(size, capacity, tail_size);
+  memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
+         copy_size);
+
+  size_t offset = copy_size;
+  copy_size = min(size - copy_size, capacity - copy_size);
+  memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);
+
+  offset += copy_size;
+  atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
+  atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
+  return offset;
+}
+
+size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
+                       size_t size) {
+  size_t avail =
+      atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+  size_t tail_size = atomic_queue->alloc - atomic_queue->read;
+  size_t copy_size = min3(size, avail, tail_size);
+  memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
+         copy_size);
+
+  size_t offset = copy_size;
+  copy_size = min(size - copy_size, avail - copy_size);
+  memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);
+
+  offset += copy_size;
+  atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
+  atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
+  return offset;
+}
+
+void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
+  free(atomic_queue->buffer);
+}
diff --git a/atomic_queue.h b/atomic_queue.h
new file mode 100644
index 0000000..50bb66d
--- /dev/null
+++ b/atomic_queue.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef ATOMIC_QUEUE_H_
+#define ATOMIC_QUEUE_H_
+
+#include <stdatomic.h>
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AtomicQueue {
+  void* buffer;
+  size_t alloc;
+  size_t read;
+  size_t write;
+  atomic_size_t size;
+};
+
+bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc);
+size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
+                        size_t size);
+size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
+                       size_t size);
+void AtomicQueueDestroy(struct AtomicQueue* atomic_queue);
+
+#endif  // ATOMIC_QUEUE_H_
diff --git a/audio.c b/audio.c
new file mode 100644
index 0000000..bb5ef23
--- /dev/null
+++ b/audio.c
@@ -0,0 +1,131 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "audio.h"
+
+#include <alsa/asoundlib.h>
+#include <errno.h>
+#include <stdatomic.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <string.h>
+#include <threads.h>
+
+#include "atomic_queue.h"
+#include "toolbox/utils.h"
+
+struct AudioContext {
+  const char* device;
+  atomic_bool running;
+  struct AtomicQueue queue;
+  thrd_t thread;
+};
+
+static int AudioContextThreadProc(void* arg) {
+  struct AudioContext* context = arg;
+
+  snd_pcm_t* pcm = NULL;
+  int err = snd_pcm_open(&pcm, context->device, SND_PCM_STREAM_PLAYBACK, 0);
+  if (err) {
+    LOG("Failed to open pcm (%s)", snd_strerror(err));
+    atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+    return 0;
+  }
+
+  // TODO(mburakov): Read audio configuration from the server.
+  err = snd_pcm_set_params(pcm, SND_PCM_FORMAT_S16_LE,
+                           SND_PCM_ACCESS_RW_INTERLEAVED, 2, 48000, 1, 10000);
+  if (err) {
+    LOG("Failed to set pcm params (%s)", snd_strerror(err));
+    atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+    goto rollback_pcm;
+  }
+
+  while (atomic_load_explicit(&context->running, memory_order_relaxed)) {
+    // TODO(mburakov): Frame size depends on dynamic audio configuration.
+    static const unsigned frame_size = sizeof(int16_t) * 2;
+    uint8_t buffer[480 * frame_size];
+
+    size_t size = AtomicQueueRead(&context->queue, buffer, sizeof(buffer));
+    if (size < sizeof(buffer)) {
+      // LOG("Audio queue underflow!");
+      memset(buffer + size, 0, sizeof(buffer) - size);
+    }
+
+    for (snd_pcm_uframes_t offset = 0; offset < sizeof(buffer) / frame_size;) {
+      snd_pcm_sframes_t nframes =
+          snd_pcm_writei(pcm, buffer + offset * frame_size,
+                         sizeof(buffer) / frame_size - offset);
+      if (nframes < 0) {
+        LOG("Failed to write pcm (%s)", snd_strerror((int)nframes));
+        atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+        goto rollback_pcm;
+      }
+      offset += (snd_pcm_uframes_t)nframes;
+    }
+  }
+
+rollback_pcm:
+  snd_pcm_close(pcm);
+  return 0;
+}
+
+struct AudioContext* AudioContextCreate(const char* device) {
+  struct AudioContext* audio_context = malloc(sizeof(struct AudioContext));
+  if (!audio_context) {
+    LOG("Failed to allocate context (%s)", strerror(errno));
+    return NULL;
+  }
+
+  audio_context->device = device;
+  atomic_init(&audio_context->running, 1);
+  if (!AtomicQueueCreate(&audio_context->queue, 4800 * sizeof(int16_t) * 2)) {
+    LOG("Failed to create queue (%s)", strerror(errno));
+    goto rollback_context;
+  }
+
+  if (thrd_create(&audio_context->thread, AudioContextThreadProc,
+                  audio_context) != thrd_success) {
+    LOG("Failed to create thread (%s)", strerror(errno));
+    goto rollback_queue;
+  }
+  return audio_context;
+
+rollback_queue:
+  AtomicQueueDestroy(&audio_context->queue);
+rollback_context:
+  free(audio_context);
+  return NULL;
+}
+
+bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer,
+                        size_t size) {
+  if (!atomic_load_explicit(&audio_context->running, memory_order_relaxed)) {
+    LOG("Audio thread was stopped early!");
+    return false;
+  }
+  if (AtomicQueueWrite(&audio_context->queue, buffer, size) < size)
+    LOG("Audio queue overflow!");
+  return true;
+}
+
+void AudioContextDestroy(struct AudioContext* audio_context) {
+  atomic_store_explicit(&audio_context->running, 0, memory_order_relaxed);
+  thrd_join(audio_context->thread, NULL);
+  AtomicQueueDestroy(&audio_context->queue);
+  free(audio_context);
+}
diff --git a/audio.h b/audio.h
new file mode 100644
index 0000000..6755b64
--- /dev/null
+++ b/audio.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef RECEIVER_AUDIO_H_
+#define RECEIVER_AUDIO_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AudioContext;
+
+struct AudioContext* AudioContextCreate(const char* device);
+bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer,
+                        size_t size);
+void AudioContextDestroy(struct AudioContext* audio_context);
+
+#endif  // RECEIVER_AUDIO_H_
diff --git a/main.c b/main.c
index 89f6938..16fd009 100644
--- a/main.c
+++ b/main.c
@@ -30,6 +30,7 @@
 #include <sys/timerfd.h>
 #include <unistd.h>
 
+#include "audio.h"
 #include "decode.h"
 #include "input.h"
 #include "proto.h"
@@ -49,6 +50,7 @@ struct Context {
   size_t overlay_height;
   struct Overlay* overlay;
   struct DecodeContext* decode_context;
+  struct AudioContext* audio_context;
   struct Buffer buffer;
 
   size_t video_bitstream;
@@ -142,7 +144,8 @@ static void GetMaxOverlaySize(size_t* width, size_t* height) {
   *height = 4 + 12 * 3 + 4;
 }
 
-static struct Context* ContextCreate(int sock, bool no_input, bool stats) {
+static struct Context* ContextCreate(int sock, bool no_input, bool stats,
+                                     const char* audio_device) {
   struct Context* context = calloc(1, sizeof(struct Context));
   if (!context) {
     LOG("Failed to allocate context (%s)", strerror(errno));
@@ -190,8 +193,18 @@ static struct Context* ContextCreate(int sock, bool no_input, bool stats) {
     LOG("Failed to create decode context");
     goto rollback_overlay;
   }
+
+  if (audio_device) {
+    context->audio_context = AudioContextCreate(audio_device);
+    if (!context->audio_context) {
+      LOG("Failed to create audio context");
+      goto rollback_decode_context;
+    }
+  }
   return context;
 
+rollback_decode_context:
+  DecodeContextDestroy(context->decode_context);
 rollback_overlay:
   if (context->overlay) OverlayDestroy(context->overlay);
 rollback_window:
@@ -284,6 +297,17 @@ static bool HandleVideoStream(struct Context* context) {
   return true;
 }
 
+static bool HandleAudioStream(struct Context* context) {
+  const struct Proto* proto = context->buffer.data;
+  if (!context->audio_context) return true;
+  if (!AudioContextDecode(context->audio_context, proto->data, proto->size)) {
+    LOG("Failed to decode incoming audio data");
+    return false;
+  }
+
+  return true;
+}
+
 static bool DemuxProtoStream(int sock, struct Context* context) {
   switch (BufferAppendFrom(&context->buffer, sock)) {
     case -1:
@@ -313,6 +337,11 @@ again:
         return false;
       }
       break;
+    case PROTO_TYPE_AUDIO:
+      if (!HandleAudioStream(context)) {
+        LOG("Failed to handle audio stream");
+        return false;
+      }
   }
 
   BufferDiscard(&context->buffer, sizeof(struct Proto) + proto->size);
@@ -344,6 +373,7 @@ static bool SendPingMessage(int sock, int timer_fd, struct Context* context) {
 
 static void ContextDestroy(struct Context* context) {
   BufferDestroy(&context->buffer);
+  if (context->audio_context) AudioContextDestroy(context->audio_context);
   DecodeContextDestroy(context->decode_context);
   if (context->overlay) OverlayDestroy(context->overlay);
   WindowDestroy(context->window);
@@ -352,7 +382,8 @@ static void ContextDestroy(struct Context* context) {
 
 int main(int argc, char* argv[]) {
   if (argc < 2) {
-    LOG("Usage: %s <ip>:<port> [--no-input] [--stats]", argv[0]);
+    LOG("Usage: %s <ip>:<port> [--no-input] [--stats] [--audio <device>]",
+        argv[0]);
     return EXIT_FAILURE;
   }
 
@@ -364,15 +395,22 @@ int main(int argc, char* argv[]) {
 
   bool no_input = false;
   bool stats = false;
+  const char* audio_device = NULL;
   for (int i = 2; i < argc; i++) {
     if (!strcmp(argv[i], "--no-input")) {
       no_input = true;
     } else if (!strcmp(argv[i], "--stats")) {
       stats = true;
+    } else if (!strcmp(argv[i], "--audio")) {
+      audio_device = argv[++i];
+      if (i == argc) {
+        LOG("Audio argument requires a value");
+        return EXIT_FAILURE;
+      }
     }
   }
 
-  struct Context* context = ContextCreate(sock, no_input, stats);
+  struct Context* context = ContextCreate(sock, no_input, stats, audio_device);
   if (!context) {
     LOG("Failed to create context");
     goto rollback_socket;
diff --git a/makefile b/makefile
index 2eefc20..ee4b9b1 100644
--- a/makefile
+++ b/makefile
@@ -10,6 +10,7 @@ obj+=\
 	toolbox/perf.o
 
 libs:=\
+	alsa \
 	libva \
 	libva-drm \
 	mfx \
-- 
cgit v1.2.3