summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2024-05-09 14:47:46 +0200
committerMikhail Burakov <mburakov@mailbox.org>2024-05-09 14:47:46 +0200
commit918682faa29197fe3b0a544906638e02785a6b17 (patch)
treeb183de608318581462574f78a3e95a18b3ef0a3d
parent7d3616cdb75194d9ec72744af6ee4c7b8276b746 (diff)
Initial audio capturing via virtual pipewire sink
-rw-r--r--audio.c279
-rw-r--r--audio.h36
-rw-r--r--buffer_queue.c100
-rw-r--r--buffer_queue.h42
-rw-r--r--main.c52
-rw-r--r--makefile1
6 files changed, 509 insertions, 1 deletions
diff --git a/audio.c b/audio.c
new file mode 100644
index 0000000..358382a
--- /dev/null
+++ b/audio.c
@@ -0,0 +1,279 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "audio.h"
+
+#include <errno.h>
+#include <pipewire/pipewire.h>
+#include <spa/param/audio/raw-utils.h>
+#include <spa/utils/result.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
+#include "buffer_queue.h"
+#include "toolbox/utils.h"
+
+#define STATUS_OK 0
+#define STATUS_ERR 1
+
+struct AudioContext {
+ const struct AudioContextCallbacks* callbacks;
+ void* user;
+
+ int waker[2];
+ struct BufferQueue* buffer_queue;
+ struct pw_thread_loop* pw_thread_loop;
+ struct pw_stream* pw_stream;
+};
+
+static void WakeClient(const struct AudioContext* audio_context, char status) {
+ if (write(audio_context->waker[1], &status, sizeof(status)) !=
+ sizeof(status)) {
+ // TODO(mburakov): Then what?..
+ abort();
+ }
+}
+
+static void OnStreamStateChanged(void* data, enum pw_stream_state old,
+ enum pw_stream_state state,
+ const char* error) {
+ (void)data;
+ LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old),
+ pw_stream_state_as_string(state), error);
+}
+
+static void OnStreamParamChanged(void* data, uint32_t id,
+ const struct spa_pod* param) {
+ struct AudioContext* audio_context = data;
+ if (param == NULL || id != SPA_PARAM_Format) return;
+
+ struct spa_audio_info audio_info;
+ if (spa_format_parse(param, &audio_info.media_type,
+ &audio_info.media_subtype) < 0) {
+ LOG("Failed to parse stream format");
+ goto failure;
+ }
+ if (audio_info.media_type != SPA_MEDIA_TYPE_audio ||
+ audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) {
+ LOG("Unexpected stream format");
+ goto failure;
+ }
+ if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) {
+ LOG("Faield to parse audio stream format");
+ goto failure;
+ }
+
+ LOG("Capturing rate: %u, channels: %u", audio_info.info.raw.rate,
+ audio_info.info.raw.channels);
+ return;
+
+failure:
+ pw_thread_loop_stop(audio_context->pw_thread_loop);
+ WakeClient(audio_context, STATUS_ERR);
+}
+
+static void OnStreamProcess(void* data) {
+ struct AudioContext* audio_context = data;
+ struct pw_buffer* pw_buffer =
+ pw_stream_dequeue_buffer(audio_context->pw_stream);
+ if (!pw_buffer) {
+ LOG("Failed to dequeue stream buffer");
+ goto failure;
+ }
+
+ for (uint32_t i = 0; i < pw_buffer->buffer->n_datas; i++) {
+ const struct spa_data* spa_data = pw_buffer->buffer->datas + i;
+ const void* buffer = (const uint8_t*)spa_data->data +
+ spa_data->chunk->offset % spa_data->maxsize;
+ uint32_t size = MIN(spa_data->chunk->size, spa_data->maxsize);
+ struct BufferQueueItem* buffer_queue_item =
+ BufferQueueItemCreate(buffer, size);
+ if (!buffer_queue_item) {
+ LOG("Failed to copy stream buffer");
+ goto failure;
+ }
+ if (!BufferQueueQueue(audio_context->buffer_queue, buffer_queue_item)) {
+ LOG("Failed to queue stream buffer copy");
+ BufferQueueItemDestroy(buffer_queue_item);
+ goto failure;
+ }
+ }
+
+ pw_stream_queue_buffer(audio_context->pw_stream, pw_buffer);
+ WakeClient(audio_context, STATUS_OK);
+ return;
+
+failure:
+ pw_thread_loop_stop(audio_context->pw_thread_loop);
+ WakeClient(audio_context, STATUS_ERR);
+}
+
+struct AudioContext* AudioContextCreate(
+ const struct AudioContextCallbacks* callbacks, void* user) {
+ pw_init(0, NULL);
+ struct AudioContext* audio_context = malloc(sizeof(struct AudioContext));
+ if (!audio_context) {
+ LOG("Failed to allocate audio context (%s)", strerror(errno));
+ return NULL;
+ }
+ *audio_context = (struct AudioContext){
+ .callbacks = callbacks,
+ .user = user,
+ };
+
+ if (pipe(audio_context->waker)) {
+ LOG("Failed to create pipe (%s)", strerror(errno));
+ goto rollback_audio_context;
+ }
+
+ audio_context->buffer_queue = BufferQueueCreate();
+ if (!audio_context->buffer_queue) {
+ LOG("Failed to create buffer queue (%s)", strerror(errno));
+ goto rollback_waker;
+ }
+
+ audio_context->pw_thread_loop = pw_thread_loop_new("audio-capture", NULL);
+ if (!audio_context->pw_thread_loop) {
+ LOG("Failed to create pipewire thread loop");
+ goto rollback_buffer_queue;
+ }
+
+ pw_thread_loop_lock(audio_context->pw_thread_loop);
+ int err = pw_thread_loop_start(audio_context->pw_thread_loop);
+ if (err) {
+ LOG("Failed to start pipewire thread loop (%s)", spa_strerror(err));
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ goto rollback_thread_loop;
+ }
+
+ // TOOD(mburakov): Read these from the commandline?
+ struct pw_properties* pw_properties = pw_properties_new(
+#define _(...) __VA_ARGS__
+ _(PW_KEY_AUDIO_FORMAT, "S16LE"), _(PW_KEY_AUDIO_RATE, "48000"),
+ _(PW_KEY_AUDIO_CHANNELS, "2"), _(SPA_KEY_AUDIO_POSITION, "[ FL FR ]"),
+ _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"),
+ _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL
+#undef _
+ );
+ if (!pw_properties) {
+ LOG("Failed to create pipewire properties");
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ goto rollback_thread_loop;
+ }
+
+ static const struct pw_stream_events kPwStreamEvents = {
+ .version = PW_VERSION_STREAM_EVENTS,
+ .state_changed = OnStreamStateChanged,
+ .param_changed = OnStreamParamChanged,
+ .process = OnStreamProcess,
+ };
+ audio_context->pw_stream = pw_stream_new_simple(
+ pw_thread_loop_get_loop(audio_context->pw_thread_loop), "audio-capture",
+ pw_properties, &kPwStreamEvents, audio_context);
+ if (!audio_context->pw_stream) {
+ LOG("Failed to create pipewire stream");
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ goto rollback_thread_loop;
+ }
+
+ uint8_t buffer[1024];
+ struct spa_pod_builder spa_pod_builder =
+ SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
+ const struct spa_pod* params[] = {
+ spa_format_audio_raw_build(
+ &spa_pod_builder, SPA_PARAM_EnumFormat,
+ &SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_S16_LE,
+ .rate = 48000, .channels = 2,
+ .position = {SPA_AUDIO_CHANNEL_FL,
+ SPA_AUDIO_CHANNEL_FR})),
+ };
+ static const enum pw_stream_flags kPwStreamFlags =
+ PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS |
+ PW_STREAM_FLAG_RT_PROCESS;
+ if (pw_stream_connect(audio_context->pw_stream, PW_DIRECTION_INPUT, PW_ID_ANY,
+ kPwStreamFlags, params, LENGTH(params))) {
+ LOG("Failed to connect pipewire stream");
+ pw_stream_destroy(audio_context->pw_stream);
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ goto rollback_thread_loop;
+ }
+
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ return audio_context;
+
+rollback_thread_loop:
+ pw_thread_loop_destroy(audio_context->pw_thread_loop);
+rollback_buffer_queue:
+ BufferQueueDestroy(audio_context->buffer_queue);
+rollback_waker:
+ close(audio_context->waker[1]);
+ close(audio_context->waker[0]);
+rollback_audio_context:
+ free(audio_context);
+ pw_deinit();
+ return NULL;
+}
+
+int AudioContextGetEventsFd(struct AudioContext* audio_context) {
+ return audio_context->waker[0];
+}
+
+bool AudioContextProcessEvents(struct AudioContext* audio_context) {
+ char status;
+ if (read(audio_context->waker[0], &status, sizeof(status)) !=
+ sizeof(status)) {
+ // TODO(mburakov): Then what?..
+ abort();
+ }
+
+ switch (status) {
+ case STATUS_OK:
+ break;
+ case STATUS_ERR:
+ LOG("Error reported from audio thread");
+ return false;
+ default:
+ __builtin_unreachable();
+ }
+
+ for (;;) {
+ struct BufferQueueItem* buffer_queue_item;
+ if (!BufferQueueDequeue(audio_context->buffer_queue, &buffer_queue_item)) {
+ LOG("Failed to dequeue stream buffer copy");
+ return false;
+ }
+ if (!buffer_queue_item) return true;
+ audio_context->callbacks->OnAudioReady(
+ audio_context->user, buffer_queue_item->data, buffer_queue_item->size);
+ BufferQueueItemDestroy(buffer_queue_item);
+ }
+}
+
+void AudioContextDestroy(struct AudioContext* audio_context) {
+ pw_thread_loop_lock(audio_context->pw_thread_loop);
+ pw_stream_destroy(audio_context->pw_stream);
+ pw_thread_loop_unlock(audio_context->pw_thread_loop);
+ pw_thread_loop_destroy(audio_context->pw_thread_loop);
+ BufferQueueDestroy(audio_context->buffer_queue);
+ close(audio_context->waker[1]);
+ close(audio_context->waker[0]);
+ free(audio_context);
+ pw_deinit();
+}
diff --git a/audio.h b/audio.h
new file mode 100644
index 0000000..377a62f
--- /dev/null
+++ b/audio.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef STREAMER_AUDIO_H_
+#define STREAMER_AUDIO_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AudioContext;
+
+struct AudioContextCallbacks {
+ void (*OnAudioReady)(void* user, const void* buffer, size_t size);
+};
+
+struct AudioContext* AudioContextCreate(
+ const struct AudioContextCallbacks* callbacks, void* user);
+int AudioContextGetEventsFd(struct AudioContext* audio_context);
+bool AudioContextProcessEvents(struct AudioContext* audio_context);
+void AudioContextDestroy(struct AudioContext* audio_context);
+
+#endif // STREAMER_AUDIO_H_
diff --git a/buffer_queue.c b/buffer_queue.c
new file mode 100644
index 0000000..bd29b14
--- /dev/null
+++ b/buffer_queue.c
@@ -0,0 +1,100 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "buffer_queue.h"
+
+#include <stdlib.h>
+#include <string.h>
+#include <threads.h>
+
+struct BufferQueue {
+ mtx_t mutex;
+ struct BufferQueueItem** items;
+ size_t size;
+ size_t alloc;
+};
+
+struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size) {
+ struct BufferQueueItem* buffer_queue_item =
+ malloc(sizeof(struct BufferQueueItem) + size);
+ if (!buffer_queue_item) return NULL;
+ buffer_queue_item->size = size;
+ memcpy(buffer_queue_item->data, data, size);
+ return buffer_queue_item;
+}
+
+void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item) {
+ free(buffer_queue_item);
+}
+
+struct BufferQueue* BufferQueueCreate(void) {
+ struct BufferQueue* buffer_queue = calloc(1, sizeof(struct BufferQueue));
+ if (!buffer_queue) return false;
+ if (mtx_init(&buffer_queue->mutex, mtx_plain) != thrd_success)
+ goto rollback_buffer_queue;
+ return buffer_queue;
+
+rollback_buffer_queue:
+ free(buffer_queue);
+ return NULL;
+}
+
+bool BufferQueueQueue(struct BufferQueue* buffer_queue,
+ struct BufferQueueItem* buffer_queue_item) {
+ if (!buffer_queue_item || mtx_lock(&buffer_queue->mutex) != thrd_success)
+ return false;
+
+ if (buffer_queue->size == buffer_queue->alloc) {
+ size_t alloc = buffer_queue->alloc + 1;
+ struct BufferQueueItem** items =
+ realloc(buffer_queue->items, sizeof(struct BufferQueueItem*) * alloc);
+ if (!items) {
+ mtx_unlock(&buffer_queue->mutex);
+ return false;
+ }
+ buffer_queue->items = items;
+ buffer_queue->alloc = alloc;
+ }
+
+ buffer_queue->items[buffer_queue->size] = buffer_queue_item;
+ buffer_queue->size++;
+ mtx_unlock(&buffer_queue->mutex);
+ return true;
+}
+
+bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
+ struct BufferQueueItem** buffer_queue_item) {
+ if (mtx_lock(&buffer_queue->mutex) != thrd_success) return false;
+ if (!buffer_queue->size) {
+ *buffer_queue_item = NULL;
+ } else {
+ buffer_queue->size--;
+ *buffer_queue_item = buffer_queue->items[0];
+ memmove(buffer_queue->items, buffer_queue->items + 1,
+ sizeof(struct BufferQueueItem*) * buffer_queue->size);
+ }
+ mtx_unlock(&buffer_queue->mutex);
+ return true;
+}
+
+void BufferQueueDestroy(struct BufferQueue* buffer_queue) {
+ for (size_t i = 0; i < buffer_queue->size; i++)
+ BufferQueueItemDestroy(buffer_queue->items[i]);
+ free(buffer_queue->items);
+ mtx_destroy(&buffer_queue->mutex);
+ free(buffer_queue);
+}
diff --git a/buffer_queue.h b/buffer_queue.h
new file mode 100644
index 0000000..4597fc4
--- /dev/null
+++ b/buffer_queue.h
@@ -0,0 +1,42 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
+ *
+ * streamer is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * streamer is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with streamer. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef STREAMER_BUFFER_QUEUE_H_
+#define STREAMER_BUFFER_QUEUE_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+
+struct BufferQueue;
+
+struct BufferQueueItem {
+ size_t size;
+ uint8_t data[];
+};
+
+struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size);
+void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item);
+
+struct BufferQueue* BufferQueueCreate(void);
+bool BufferQueueQueue(struct BufferQueue* buffer_queue,
+ struct BufferQueueItem* buffer_queue_item);
+bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
+ struct BufferQueueItem** buffer_queue_item);
+void BufferQueueDestroy(struct BufferQueue* buffer_queue);
+
+#endif // STREAMER_BUFFER_QUEUE_H_
diff --git a/main.c b/main.c
index a180bca..001d0aa 100644
--- a/main.c
+++ b/main.c
@@ -26,6 +26,7 @@
#include <sys/socket.h>
#include <unistd.h>
+#include "audio.h"
#include "capture.h"
#include "colorspace.h"
#include "encode.h"
@@ -47,6 +48,7 @@ static void OnSignal(int status) { g_signal = status; }
struct Contexts {
bool disable_uhid;
+ struct AudioContext* audio_context;
struct GpuContext* gpu_context;
struct IoMuxer io_muxer;
int server_fd;
@@ -116,6 +118,16 @@ static void MaybeDropClient(struct Contexts* contexts) {
}
}
+static void OnAudioContextAudioReady(void* user, const void* buffer,
+ size_t size) {
+ struct Contexts* contexts = user;
+ LOG("Got a buffer!");
+ // TODO(mburakov): Implement this!
+ (void)contexts;
+ (void)buffer;
+ (void)size;
+}
+
static void OnCaptureContextFrameReady(void* user,
const struct GpuFrame* captured_frame) {
struct Contexts* contexts = user;
@@ -191,6 +203,22 @@ drop_client:
MaybeDropClient(contexts);
}
+static void OnAudioContextEvents(void* user) {
+ struct Contexts* contexts = user;
+ if (!IoMuxerOnRead(&contexts->io_muxer,
+ AudioContextGetEventsFd(contexts->audio_context),
+ &OnAudioContextEvents, user)) {
+ LOG("Failed to reschedule audio io (%s)", strerror(errno));
+ g_signal = SIGABRT;
+ return;
+ }
+ if (!AudioContextProcessEvents(contexts->audio_context)) {
+ LOG("Failed to process audio events");
+ g_signal = SIGABRT;
+ return;
+ }
+}
+
static void OnCaptureContextEvents(void* user) {
struct Contexts* contexts = user;
if (!IoMuxerOnRead(&contexts->io_muxer,
@@ -294,22 +322,42 @@ int main(int argc, char* argv[]) {
contexts.disable_uhid = true;
}
}
+
+ static struct AudioContextCallbacks kAudioContextCallbacks = {
+ .OnAudioReady = OnAudioContextAudioReady,
+ };
+ contexts.audio_context =
+ AudioContextCreate(&kAudioContextCallbacks, &contexts);
+ if (!contexts.audio_context) {
+ LOG("Failed to create audio context");
+ return EXIT_FAILURE;
+ }
+
contexts.gpu_context = GpuContextCreate(colorspace, range);
if (!contexts.gpu_context) {
LOG("Failed to create gpu context");
- return EXIT_FAILURE;
+ goto rollback_audio_context;
}
+
IoMuxerCreate(&contexts.io_muxer);
contexts.server_fd = CreateServerSocket(argv[1]);
if (contexts.server_fd == -1) {
LOG("Failed to create server socket");
goto rollback_io_muxer;
}
+
+ if (!IoMuxerOnRead(&contexts.io_muxer,
+ AudioContextGetEventsFd(contexts.audio_context),
+ &OnAudioContextEvents, &contexts)) {
+ LOG("Failed to schedule audio io (%s)", strerror(errno));
+ goto rollback_server_fd;
+ }
if (!IoMuxerOnRead(&contexts.io_muxer, contexts.server_fd,
&OnClientConnecting, &contexts)) {
LOG("Failed to schedule accept (%s)", strerror(errno));
goto rollback_server_fd;
}
+
while (!g_signal) {
if (IoMuxerIterate(&contexts.io_muxer, -1) && errno != EINTR) {
LOG("Failed to iterate io muxer (%s)", strerror(errno));
@@ -327,6 +375,8 @@ rollback_server_fd:
rollback_io_muxer:
IoMuxerDestroy(&contexts.io_muxer);
GpuContextDestroy(contexts.gpu_context);
+rollback_audio_context:
+ AudioContextDestroy(contexts.audio_context);
bool result = g_signal == SIGINT || g_signal == SIGTERM;
return result ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/makefile b/makefile
index 6785689..1c5b018 100644
--- a/makefile
+++ b/makefile
@@ -12,6 +12,7 @@ libs:=\
gbm \
glesv2 \
libdrm \
+ libpipewire-0.3 \
libva \
libva-drm