From 918682faa29197fe3b0a544906638e02785a6b17 Mon Sep 17 00:00:00 2001
From: Mikhail Burakov <mburakov@mailbox.org>
Date: Thu, 9 May 2024 14:47:46 +0200
Subject: Initial audio capturing via virtual pipewire sink

---
 audio.c        | 279 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 audio.h        |  36 ++++++++
 buffer_queue.c | 100 +++++++++++++++++++++
 buffer_queue.h |  42 +++++++++
 main.c         |  52 ++++++++++-
 makefile       |   1 +
 6 files changed, 509 insertions(+), 1 deletion(-)
 create mode 100644 audio.c
 create mode 100644 audio.h
 create mode 100644 buffer_queue.c
 create mode 100644 buffer_queue.h

diff --git a/audio.c b/audio.c
new file mode 100644
index 0000000..358382a
--- /dev/null
+++ b/audio.c
@@ -0,0 +1,279 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "audio.h"
+
+#include <errno.h>
+#include <pipewire/pipewire.h>
+#include <spa/param/audio/raw-utils.h>
+#include <spa/utils/result.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "buffer_queue.h"
+#include "toolbox/utils.h"
+
+#define STATUS_OK 0
+#define STATUS_ERR 1
+
+struct AudioContext {
+  const struct AudioContextCallbacks* callbacks;
+  void* user;
+
+  int waker[2];
+  struct BufferQueue* buffer_queue;
+  struct pw_thread_loop* pw_thread_loop;
+  struct pw_stream* pw_stream;
+};
+
+static void WakeClient(const struct AudioContext* audio_context, char status) {
+  if (write(audio_context->waker[1], &status, sizeof(status)) !=
+      sizeof(status)) {
+    // TODO(mburakov): Then what?..
+    abort();
+  }
+}
+
+static void OnStreamStateChanged(void* data, enum pw_stream_state old,
+                                 enum pw_stream_state state,
+                                 const char* error) {
+  (void)data;
+  LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old),
+      pw_stream_state_as_string(state), error);
+}
+
+static void OnStreamParamChanged(void* data, uint32_t id,
+                                 const struct spa_pod* param) {
+  struct AudioContext* audio_context = data;
+  if (param == NULL || id != SPA_PARAM_Format) return;
+
+  struct spa_audio_info audio_info;
+  if (spa_format_parse(param, &audio_info.media_type,
+                       &audio_info.media_subtype) < 0) {
+    LOG("Failed to parse stream format");
+    goto failure;
+  }
+  if (audio_info.media_type != SPA_MEDIA_TYPE_audio ||
+      audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) {
+    LOG("Unexpected stream format");
+    goto failure;
+  }
+  if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) {
+    LOG("Faield to parse audio stream format");
+    goto failure;
+  }
+
+  LOG("Capturing rate: %u, channels: %u", audio_info.info.raw.rate,
+      audio_info.info.raw.channels);
+  return;
+
+failure:
+  pw_thread_loop_stop(audio_context->pw_thread_loop);
+  WakeClient(audio_context, STATUS_ERR);
+}
+
+static void OnStreamProcess(void* data) {
+  struct AudioContext* audio_context = data;
+  struct pw_buffer* pw_buffer =
+      pw_stream_dequeue_buffer(audio_context->pw_stream);
+  if (!pw_buffer) {
+    LOG("Failed to dequeue stream buffer");
+    goto failure;
+  }
+
+  for (uint32_t i = 0; i < pw_buffer->buffer->n_datas; i++) {
+    const struct spa_data* spa_data = pw_buffer->buffer->datas + i;
+    const void* buffer = (const uint8_t*)spa_data->data +
+                         spa_data->chunk->offset % spa_data->maxsize;
+    uint32_t size = MIN(spa_data->chunk->size, spa_data->maxsize);
+    struct BufferQueueItem* buffer_queue_item =
+        BufferQueueItemCreate(buffer, size);
+    if (!buffer_queue_item) {
+      LOG("Failed to copy stream buffer");
+      goto failure;
+    }
+    if (!BufferQueueQueue(audio_context->buffer_queue, buffer_queue_item)) {
+      LOG("Failed to queue stream buffer copy");
+      BufferQueueItemDestroy(buffer_queue_item);
+      goto failure;
+    }
+  }
+
+  pw_stream_queue_buffer(audio_context->pw_stream, pw_buffer);
+  WakeClient(audio_context, STATUS_OK);
+  return;
+
+failure:
+  pw_thread_loop_stop(audio_context->pw_thread_loop);
+  WakeClient(audio_context, STATUS_ERR);
+}
+
+struct AudioContext* AudioContextCreate(
+    const struct AudioContextCallbacks* callbacks, void* user) {
+  pw_init(0, NULL);
+  struct AudioContext* audio_context = malloc(sizeof(struct AudioContext));
+  if (!audio_context) {
+    LOG("Failed to allocate audio context (%s)", strerror(errno));
+    return NULL;
+  }
+  *audio_context = (struct AudioContext){
+      .callbacks = callbacks,
+      .user = user,
+  };
+
+  if (pipe(audio_context->waker)) {
+    LOG("Failed to create pipe (%s)", strerror(errno));
+    goto rollback_audio_context;
+  }
+
+  audio_context->buffer_queue = BufferQueueCreate();
+  if (!audio_context->buffer_queue) {
+    LOG("Failed to create buffer queue (%s)", strerror(errno));
+    goto rollback_waker;
+  }
+
+  audio_context->pw_thread_loop = pw_thread_loop_new("audio-capture", NULL);
+  if (!audio_context->pw_thread_loop) {
+    LOG("Failed to create pipewire thread loop");
+    goto rollback_buffer_queue;
+  }
+
+  pw_thread_loop_lock(audio_context->pw_thread_loop);
+  int err = pw_thread_loop_start(audio_context->pw_thread_loop);
+  if (err) {
+    LOG("Failed to start pipewire thread loop (%s)", spa_strerror(err));
+    pw_thread_loop_unlock(audio_context->pw_thread_loop);
+    goto rollback_thread_loop;
+  }
+
+  // TOOD(mburakov): Read these from the commandline?
+  struct pw_properties* pw_properties = pw_properties_new(
+#define _(...) __VA_ARGS__
+      _(PW_KEY_AUDIO_FORMAT, "S16LE"), _(PW_KEY_AUDIO_RATE, "48000"),
+      _(PW_KEY_AUDIO_CHANNELS, "2"), _(SPA_KEY_AUDIO_POSITION, "[ FL FR ]"),
+      _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"),
+      _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL
+#undef _
+  );
+  if (!pw_properties) {
+    LOG("Failed to create pipewire properties");
+    pw_thread_loop_unlock(audio_context->pw_thread_loop);
+    goto rollback_thread_loop;
+  }
+
+  static const struct pw_stream_events kPwStreamEvents = {
+      .version = PW_VERSION_STREAM_EVENTS,
+      .state_changed = OnStreamStateChanged,
+      .param_changed = OnStreamParamChanged,
+      .process = OnStreamProcess,
+  };
+  audio_context->pw_stream = pw_stream_new_simple(
+      pw_thread_loop_get_loop(audio_context->pw_thread_loop), "audio-capture",
+      pw_properties, &kPwStreamEvents, audio_context);
+  if (!audio_context->pw_stream) {
+    LOG("Failed to create pipewire stream");
+    pw_thread_loop_unlock(audio_context->pw_thread_loop);
+    goto rollback_thread_loop;
+  }
+
+  uint8_t buffer[1024];
+  struct spa_pod_builder spa_pod_builder =
+      SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
+  const struct spa_pod* params[] = {
+      spa_format_audio_raw_build(
+          &spa_pod_builder, SPA_PARAM_EnumFormat,
+          &SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_S16_LE,
+                                   .rate = 48000, .channels = 2,
+                                   .position = {SPA_AUDIO_CHANNEL_FL,
+                                                SPA_AUDIO_CHANNEL_FR})),
+  };
+  static const enum pw_stream_flags kPwStreamFlags =
+      PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS |
+      PW_STREAM_FLAG_RT_PROCESS;
+  if (pw_stream_connect(audio_context->pw_stream, PW_DIRECTION_INPUT, PW_ID_ANY,
+                        kPwStreamFlags, params, LENGTH(params))) {
+    LOG("Failed to connect pipewire stream");
+    pw_stream_destroy(audio_context->pw_stream);
+    pw_thread_loop_unlock(audio_context->pw_thread_loop);
+    goto rollback_thread_loop;
+  }
+
+  pw_thread_loop_unlock(audio_context->pw_thread_loop);
+  return audio_context;
+
+rollback_thread_loop:
+  pw_thread_loop_destroy(audio_context->pw_thread_loop);
+rollback_buffer_queue:
+  BufferQueueDestroy(audio_context->buffer_queue);
+rollback_waker:
+  close(audio_context->waker[1]);
+  close(audio_context->waker[0]);
+rollback_audio_context:
+  free(audio_context);
+  pw_deinit();
+  return NULL;
+}
+
+int AudioContextGetEventsFd(struct AudioContext* audio_context) {
+  return audio_context->waker[0];
+}
+
+bool AudioContextProcessEvents(struct AudioContext* audio_context) {
+  char status;
+  if (read(audio_context->waker[0], &status, sizeof(status)) !=
+      sizeof(status)) {
+    // TODO(mburakov): Then what?..
+    abort();
+  }
+
+  switch (status) {
+    case STATUS_OK:
+      break;
+    case STATUS_ERR:
+      LOG("Error reported from audio thread");
+      return false;
+    default:
+      __builtin_unreachable();
+  }
+
+  for (;;) {
+    struct BufferQueueItem* buffer_queue_item;
+    if (!BufferQueueDequeue(audio_context->buffer_queue, &buffer_queue_item)) {
+      LOG("Failed to dequeue stream buffer copy");
+      return false;
+    }
+    if (!buffer_queue_item) return true;
+    audio_context->callbacks->OnAudioReady(
+        audio_context->user, buffer_queue_item->data, buffer_queue_item->size);
+    BufferQueueItemDestroy(buffer_queue_item);
+  }
+}
+
+void AudioContextDestroy(struct AudioContext* audio_context) {
+  pw_thread_loop_lock(audio_context->pw_thread_loop);
+  pw_stream_destroy(audio_context->pw_stream);
+  pw_thread_loop_unlock(audio_context->pw_thread_loop);
+  pw_thread_loop_destroy(audio_context->pw_thread_loop);
+  BufferQueueDestroy(audio_context->buffer_queue);
+  close(audio_context->waker[1]);
+  close(audio_context->waker[0]);
+  free(audio_context);
+  pw_deinit();
+}
diff --git a/audio.h b/audio.h
new file mode 100644
index 0000000..377a62f
--- /dev/null
+++ b/audio.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef STREAMER_AUDIO_H_
+#define STREAMER_AUDIO_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AudioContext;
+
+struct AudioContextCallbacks {
+  void (*OnAudioReady)(void* user, const void* buffer, size_t size);
+};
+
+struct AudioContext* AudioContextCreate(
+    const struct AudioContextCallbacks* callbacks, void* user);
+int AudioContextGetEventsFd(struct AudioContext* audio_context);
+bool AudioContextProcessEvents(struct AudioContext* audio_context);
+void AudioContextDestroy(struct AudioContext* audio_context);
+
+#endif  // STREAMER_AUDIO_H_
diff --git a/buffer_queue.c b/buffer_queue.c
new file mode 100644
index 0000000..bd29b14
--- /dev/null
+++ b/buffer_queue.c
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "buffer_queue.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <threads.h>
+
+struct BufferQueue {
+  mtx_t mutex;
+  struct BufferQueueItem** items;
+  size_t size;
+  size_t alloc;
+};
+
+struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size) {
+  struct BufferQueueItem* buffer_queue_item =
+      malloc(sizeof(struct BufferQueueItem) + size);
+  if (!buffer_queue_item) return NULL;
+  buffer_queue_item->size = size;
+  memcpy(buffer_queue_item->data, data, size);
+  return buffer_queue_item;
+}
+
+void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item) {
+  free(buffer_queue_item);
+}
+
+struct BufferQueue* BufferQueueCreate(void) {
+  struct BufferQueue* buffer_queue = calloc(1, sizeof(struct BufferQueue));
+  if (!buffer_queue) return false;
+  if (mtx_init(&buffer_queue->mutex, mtx_plain) != thrd_success)
+    goto rollback_buffer_queue;
+  return buffer_queue;
+
+rollback_buffer_queue:
+  free(buffer_queue);
+  return NULL;
+}
+
+bool BufferQueueQueue(struct BufferQueue* buffer_queue,
+                      struct BufferQueueItem* buffer_queue_item) {
+  if (!buffer_queue_item || mtx_lock(&buffer_queue->mutex) != thrd_success)
+    return false;
+
+  if (buffer_queue->size == buffer_queue->alloc) {
+    size_t alloc = buffer_queue->alloc + 1;
+    struct BufferQueueItem** items =
+        realloc(buffer_queue->items, sizeof(struct BufferQueueItem*) * alloc);
+    if (!items) {
+      mtx_unlock(&buffer_queue->mutex);
+      return false;
+    }
+    buffer_queue->items = items;
+    buffer_queue->alloc = alloc;
+  }
+
+  buffer_queue->items[buffer_queue->size] = buffer_queue_item;
+  buffer_queue->size++;
+  mtx_unlock(&buffer_queue->mutex);
+  return true;
+}
+
+bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
+                        struct BufferQueueItem** buffer_queue_item) {
+  if (mtx_lock(&buffer_queue->mutex) != thrd_success) return false;
+  if (!buffer_queue->size) {
+    *buffer_queue_item = NULL;
+  } else {
+    buffer_queue->size--;
+    *buffer_queue_item = buffer_queue->items[0];
+    memmove(buffer_queue->items, buffer_queue->items + 1,
+            sizeof(struct BufferQueueItem*) * buffer_queue->size);
+  }
+  mtx_unlock(&buffer_queue->mutex);
+  return true;
+}
+
+void BufferQueueDestroy(struct BufferQueue* buffer_queue) {
+  for (size_t i = 0; i < buffer_queue->size; i++)
+    BufferQueueItemDestroy(buffer_queue->items[i]);
+  free(buffer_queue->items);
+  mtx_destroy(&buffer_queue->mutex);
+  free(buffer_queue);
+}
diff --git a/buffer_queue.h b/buffer_queue.h
new file mode 100644
index 0000000..4597fc4
--- /dev/null
+++ b/buffer_queue.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer.  If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef STREAMER_BUFFER_QUEUE_H_
+#define STREAMER_BUFFER_QUEUE_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+struct BufferQueue;
+
+struct BufferQueueItem {
+  size_t size;
+  uint8_t data[];
+};
+
+struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size);
+void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item);
+
+struct BufferQueue* BufferQueueCreate(void);
+bool BufferQueueQueue(struct BufferQueue* buffer_queue,
+                      struct BufferQueueItem* buffer_queue_item);
+bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
+                        struct BufferQueueItem** buffer_queue_item);
+void BufferQueueDestroy(struct BufferQueue* buffer_queue);
+
+#endif  // STREAMER_BUFFER_QUEUE_H_
diff --git a/main.c b/main.c
index a180bca..001d0aa 100644
--- a/main.c
+++ b/main.c
@@ -26,6 +26,7 @@
 #include <sys/socket.h>
 #include <unistd.h>
 
+#include "audio.h"
 #include "capture.h"
 #include "colorspace.h"
 #include "encode.h"
@@ -47,6 +48,7 @@ static void OnSignal(int status) { g_signal = status; }
 
 struct Contexts {
   bool disable_uhid;
+  struct AudioContext* audio_context;
   struct GpuContext* gpu_context;
   struct IoMuxer io_muxer;
   int server_fd;
@@ -116,6 +118,16 @@ static void MaybeDropClient(struct Contexts* contexts) {
   }
 }
 
+static void OnAudioContextAudioReady(void* user, const void* buffer,
+                                     size_t size) {
+  struct Contexts* contexts = user;
+  LOG("Got a buffer!");
+  // TODO(mburakov): Implement this!
+  (void)contexts;
+  (void)buffer;
+  (void)size;
+}
+
 static void OnCaptureContextFrameReady(void* user,
                                        const struct GpuFrame* captured_frame) {
   struct Contexts* contexts = user;
@@ -191,6 +203,22 @@ drop_client:
   MaybeDropClient(contexts);
 }
 
+static void OnAudioContextEvents(void* user) {
+  struct Contexts* contexts = user;
+  if (!IoMuxerOnRead(&contexts->io_muxer,
+                     AudioContextGetEventsFd(contexts->audio_context),
+                     &OnAudioContextEvents, user)) {
+    LOG("Failed to reschedule audio io (%s)", strerror(errno));
+    g_signal = SIGABRT;
+    return;
+  }
+  if (!AudioContextProcessEvents(contexts->audio_context)) {
+    LOG("Failed to process audio events");
+    g_signal = SIGABRT;
+    return;
+  }
+}
+
 static void OnCaptureContextEvents(void* user) {
   struct Contexts* contexts = user;
   if (!IoMuxerOnRead(&contexts->io_muxer,
@@ -294,22 +322,42 @@ int main(int argc, char* argv[]) {
       contexts.disable_uhid = true;
     }
   }
+
+  static struct AudioContextCallbacks kAudioContextCallbacks = {
+      .OnAudioReady = OnAudioContextAudioReady,
+  };
+  contexts.audio_context =
+      AudioContextCreate(&kAudioContextCallbacks, &contexts);
+  if (!contexts.audio_context) {
+    LOG("Failed to create audio context");
+    return EXIT_FAILURE;
+  }
+
   contexts.gpu_context = GpuContextCreate(colorspace, range);
   if (!contexts.gpu_context) {
     LOG("Failed to create gpu context");
-    return EXIT_FAILURE;
+    goto rollback_audio_context;
   }
+
   IoMuxerCreate(&contexts.io_muxer);
   contexts.server_fd = CreateServerSocket(argv[1]);
   if (contexts.server_fd == -1) {
     LOG("Failed to create server socket");
     goto rollback_io_muxer;
   }
+
+  if (!IoMuxerOnRead(&contexts.io_muxer,
+                     AudioContextGetEventsFd(contexts.audio_context),
+                     &OnAudioContextEvents, &contexts)) {
+    LOG("Failed to schedule audio io (%s)", strerror(errno));
+    goto rollback_server_fd;
+  }
   if (!IoMuxerOnRead(&contexts.io_muxer, contexts.server_fd,
                      &OnClientConnecting, &contexts)) {
     LOG("Failed to schedule accept (%s)", strerror(errno));
     goto rollback_server_fd;
   }
+
   while (!g_signal) {
     if (IoMuxerIterate(&contexts.io_muxer, -1) && errno != EINTR) {
       LOG("Failed to iterate io muxer (%s)", strerror(errno));
@@ -327,6 +375,8 @@ rollback_server_fd:
 rollback_io_muxer:
   IoMuxerDestroy(&contexts.io_muxer);
   GpuContextDestroy(contexts.gpu_context);
+rollback_audio_context:
+  AudioContextDestroy(contexts.audio_context);
   bool result = g_signal == SIGINT || g_signal == SIGTERM;
   return result ? EXIT_SUCCESS : EXIT_FAILURE;
 }
diff --git a/makefile b/makefile
index 6785689..1c5b018 100644
--- a/makefile
+++ b/makefile
@@ -12,6 +12,7 @@ libs:=\
 	gbm \
 	glesv2 \
 	libdrm \
+	libpipewire-0.3 \
 	libva \
 	libva-drm
 
-- 
cgit v1.2.3