diff options
author | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-09 14:47:46 +0200 |
---|---|---|
committer | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-09 14:47:46 +0200 |
commit | 918682faa29197fe3b0a544906638e02785a6b17 (patch) | |
tree | b183de608318581462574f78a3e95a18b3ef0a3d | |
parent | 7d3616cdb75194d9ec72744af6ee4c7b8276b746 (diff) |
Initial audio capturing via virtual pipewire sink
-rw-r--r-- | audio.c | 279 | ||||
-rw-r--r-- | audio.h | 36 | ||||
-rw-r--r-- | buffer_queue.c | 100 | ||||
-rw-r--r-- | buffer_queue.h | 42 | ||||
-rw-r--r-- | main.c | 52 | ||||
-rw-r--r-- | makefile | 1 |
6 files changed, 509 insertions, 1 deletions
@@ -0,0 +1,279 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "audio.h" + +#include <errno.h> +#include <pipewire/pipewire.h> +#include <spa/param/audio/raw-utils.h> +#include <spa/utils/result.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "buffer_queue.h" +#include "toolbox/utils.h" + +#define STATUS_OK 0 +#define STATUS_ERR 1 + +struct AudioContext { + const struct AudioContextCallbacks* callbacks; + void* user; + + int waker[2]; + struct BufferQueue* buffer_queue; + struct pw_thread_loop* pw_thread_loop; + struct pw_stream* pw_stream; +}; + +static void WakeClient(const struct AudioContext* audio_context, char status) { + if (write(audio_context->waker[1], &status, sizeof(status)) != + sizeof(status)) { + // TODO(mburakov): Then what?.. + abort(); + } +} + +static void OnStreamStateChanged(void* data, enum pw_stream_state old, + enum pw_stream_state state, + const char* error) { + (void)data; + LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old), + pw_stream_state_as_string(state), error); +} + +static void OnStreamParamChanged(void* data, uint32_t id, + const struct spa_pod* param) { + struct AudioContext* audio_context = data; + if (param == NULL || id != SPA_PARAM_Format) return; + + struct spa_audio_info audio_info; + if (spa_format_parse(param, &audio_info.media_type, + &audio_info.media_subtype) < 0) { + LOG("Failed to parse stream format"); + goto failure; + } + if (audio_info.media_type != SPA_MEDIA_TYPE_audio || + audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) { + LOG("Unexpected stream format"); + goto failure; + } + if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) { + LOG("Faield to parse audio stream format"); + goto failure; + } + + LOG("Capturing rate: %u, channels: %u", audio_info.info.raw.rate, + audio_info.info.raw.channels); + return; + +failure: + pw_thread_loop_stop(audio_context->pw_thread_loop); + WakeClient(audio_context, STATUS_ERR); +} + +static void OnStreamProcess(void* data) { + struct AudioContext* audio_context = data; + struct pw_buffer* pw_buffer = + pw_stream_dequeue_buffer(audio_context->pw_stream); + if (!pw_buffer) { + LOG("Failed to dequeue stream buffer"); + goto failure; + } + + for (uint32_t i = 0; i < pw_buffer->buffer->n_datas; i++) { + const struct spa_data* spa_data = pw_buffer->buffer->datas + i; + const void* buffer = (const uint8_t*)spa_data->data + + spa_data->chunk->offset % spa_data->maxsize; + uint32_t size = MIN(spa_data->chunk->size, spa_data->maxsize); + struct BufferQueueItem* buffer_queue_item = + BufferQueueItemCreate(buffer, size); + if (!buffer_queue_item) { + LOG("Failed to copy stream buffer"); + goto failure; + } + if (!BufferQueueQueue(audio_context->buffer_queue, buffer_queue_item)) { + LOG("Failed to queue stream buffer copy"); + BufferQueueItemDestroy(buffer_queue_item); + goto failure; + } + } + + pw_stream_queue_buffer(audio_context->pw_stream, pw_buffer); + WakeClient(audio_context, STATUS_OK); + return; + +failure: + pw_thread_loop_stop(audio_context->pw_thread_loop); + WakeClient(audio_context, STATUS_ERR); +} + +struct AudioContext* AudioContextCreate( + const struct AudioContextCallbacks* callbacks, void* user) { + pw_init(0, NULL); + struct AudioContext* audio_context = malloc(sizeof(struct AudioContext)); + if (!audio_context) { + LOG("Failed to allocate audio context (%s)", strerror(errno)); + return NULL; + } + *audio_context = (struct AudioContext){ + .callbacks = callbacks, + .user = user, + }; + + if (pipe(audio_context->waker)) { + LOG("Failed to create pipe (%s)", strerror(errno)); + goto rollback_audio_context; + } + + audio_context->buffer_queue = BufferQueueCreate(); + if (!audio_context->buffer_queue) { + LOG("Failed to create buffer queue (%s)", strerror(errno)); + goto rollback_waker; + } + + audio_context->pw_thread_loop = pw_thread_loop_new("audio-capture", NULL); + if (!audio_context->pw_thread_loop) { + LOG("Failed to create pipewire thread loop"); + goto rollback_buffer_queue; + } + + pw_thread_loop_lock(audio_context->pw_thread_loop); + int err = pw_thread_loop_start(audio_context->pw_thread_loop); + if (err) { + LOG("Failed to start pipewire thread loop (%s)", spa_strerror(err)); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + // TOOD(mburakov): Read these from the commandline? + struct pw_properties* pw_properties = pw_properties_new( +#define _(...) __VA_ARGS__ + _(PW_KEY_AUDIO_FORMAT, "S16LE"), _(PW_KEY_AUDIO_RATE, "48000"), + _(PW_KEY_AUDIO_CHANNELS, "2"), _(SPA_KEY_AUDIO_POSITION, "[ FL FR ]"), + _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"), + _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL +#undef _ + ); + if (!pw_properties) { + LOG("Failed to create pipewire properties"); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + static const struct pw_stream_events kPwStreamEvents = { + .version = PW_VERSION_STREAM_EVENTS, + .state_changed = OnStreamStateChanged, + .param_changed = OnStreamParamChanged, + .process = OnStreamProcess, + }; + audio_context->pw_stream = pw_stream_new_simple( + pw_thread_loop_get_loop(audio_context->pw_thread_loop), "audio-capture", + pw_properties, &kPwStreamEvents, audio_context); + if (!audio_context->pw_stream) { + LOG("Failed to create pipewire stream"); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + uint8_t buffer[1024]; + struct spa_pod_builder spa_pod_builder = + SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod* params[] = { + spa_format_audio_raw_build( + &spa_pod_builder, SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_S16_LE, + .rate = 48000, .channels = 2, + .position = {SPA_AUDIO_CHANNEL_FL, + SPA_AUDIO_CHANNEL_FR})), + }; + static const enum pw_stream_flags kPwStreamFlags = + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS; + if (pw_stream_connect(audio_context->pw_stream, PW_DIRECTION_INPUT, PW_ID_ANY, + kPwStreamFlags, params, LENGTH(params))) { + LOG("Failed to connect pipewire stream"); + pw_stream_destroy(audio_context->pw_stream); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + pw_thread_loop_unlock(audio_context->pw_thread_loop); + return audio_context; + +rollback_thread_loop: + pw_thread_loop_destroy(audio_context->pw_thread_loop); +rollback_buffer_queue: + BufferQueueDestroy(audio_context->buffer_queue); +rollback_waker: + close(audio_context->waker[1]); + close(audio_context->waker[0]); +rollback_audio_context: + free(audio_context); + pw_deinit(); + return NULL; +} + +int AudioContextGetEventsFd(struct AudioContext* audio_context) { + return audio_context->waker[0]; +} + +bool AudioContextProcessEvents(struct AudioContext* audio_context) { + char status; + if (read(audio_context->waker[0], &status, sizeof(status)) != + sizeof(status)) { + // TODO(mburakov): Then what?.. + abort(); + } + + switch (status) { + case STATUS_OK: + break; + case STATUS_ERR: + LOG("Error reported from audio thread"); + return false; + default: + __builtin_unreachable(); + } + + for (;;) { + struct BufferQueueItem* buffer_queue_item; + if (!BufferQueueDequeue(audio_context->buffer_queue, &buffer_queue_item)) { + LOG("Failed to dequeue stream buffer copy"); + return false; + } + if (!buffer_queue_item) return true; + audio_context->callbacks->OnAudioReady( + audio_context->user, buffer_queue_item->data, buffer_queue_item->size); + BufferQueueItemDestroy(buffer_queue_item); + } +} + +void AudioContextDestroy(struct AudioContext* audio_context) { + pw_thread_loop_lock(audio_context->pw_thread_loop); + pw_stream_destroy(audio_context->pw_stream); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + pw_thread_loop_destroy(audio_context->pw_thread_loop); + BufferQueueDestroy(audio_context->buffer_queue); + close(audio_context->waker[1]); + close(audio_context->waker[0]); + free(audio_context); + pw_deinit(); +} @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef STREAMER_AUDIO_H_ +#define STREAMER_AUDIO_H_ + +#include <stdbool.h> +#include <stddef.h> + +struct AudioContext; + +struct AudioContextCallbacks { + void (*OnAudioReady)(void* user, const void* buffer, size_t size); +}; + +struct AudioContext* AudioContextCreate( + const struct AudioContextCallbacks* callbacks, void* user); +int AudioContextGetEventsFd(struct AudioContext* audio_context); +bool AudioContextProcessEvents(struct AudioContext* audio_context); +void AudioContextDestroy(struct AudioContext* audio_context); + +#endif // STREAMER_AUDIO_H_ diff --git a/buffer_queue.c b/buffer_queue.c new file mode 100644 index 0000000..bd29b14 --- /dev/null +++ b/buffer_queue.c @@ -0,0 +1,100 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "buffer_queue.h" + +#include <stdlib.h> +#include <string.h> +#include <threads.h> + +struct BufferQueue { + mtx_t mutex; + struct BufferQueueItem** items; + size_t size; + size_t alloc; +}; + +struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size) { + struct BufferQueueItem* buffer_queue_item = + malloc(sizeof(struct BufferQueueItem) + size); + if (!buffer_queue_item) return NULL; + buffer_queue_item->size = size; + memcpy(buffer_queue_item->data, data, size); + return buffer_queue_item; +} + +void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item) { + free(buffer_queue_item); +} + +struct BufferQueue* BufferQueueCreate(void) { + struct BufferQueue* buffer_queue = calloc(1, sizeof(struct BufferQueue)); + if (!buffer_queue) return false; + if (mtx_init(&buffer_queue->mutex, mtx_plain) != thrd_success) + goto rollback_buffer_queue; + return buffer_queue; + +rollback_buffer_queue: + free(buffer_queue); + return NULL; +} + +bool BufferQueueQueue(struct BufferQueue* buffer_queue, + struct BufferQueueItem* buffer_queue_item) { + if (!buffer_queue_item || mtx_lock(&buffer_queue->mutex) != thrd_success) + return false; + + if (buffer_queue->size == buffer_queue->alloc) { + size_t alloc = buffer_queue->alloc + 1; + struct BufferQueueItem** items = + realloc(buffer_queue->items, sizeof(struct BufferQueueItem*) * alloc); + if (!items) { + mtx_unlock(&buffer_queue->mutex); + return false; + } + buffer_queue->items = items; + buffer_queue->alloc = alloc; + } + + buffer_queue->items[buffer_queue->size] = buffer_queue_item; + buffer_queue->size++; + mtx_unlock(&buffer_queue->mutex); + return true; +} + +bool BufferQueueDequeue(struct BufferQueue* buffer_queue, + struct BufferQueueItem** buffer_queue_item) { + if (mtx_lock(&buffer_queue->mutex) != thrd_success) return false; + if (!buffer_queue->size) { + *buffer_queue_item = NULL; + } else { + buffer_queue->size--; + *buffer_queue_item = buffer_queue->items[0]; + memmove(buffer_queue->items, buffer_queue->items + 1, + sizeof(struct BufferQueueItem*) * buffer_queue->size); + } + mtx_unlock(&buffer_queue->mutex); + return true; +} + +void BufferQueueDestroy(struct BufferQueue* buffer_queue) { + for (size_t i = 0; i < buffer_queue->size; i++) + BufferQueueItemDestroy(buffer_queue->items[i]); + free(buffer_queue->items); + mtx_destroy(&buffer_queue->mutex); + free(buffer_queue); +} diff --git a/buffer_queue.h b/buffer_queue.h new file mode 100644 index 0000000..4597fc4 --- /dev/null +++ b/buffer_queue.h @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef STREAMER_BUFFER_QUEUE_H_ +#define STREAMER_BUFFER_QUEUE_H_ + +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> + +struct BufferQueue; + +struct BufferQueueItem { + size_t size; + uint8_t data[]; +}; + +struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size); +void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item); + +struct BufferQueue* BufferQueueCreate(void); +bool BufferQueueQueue(struct BufferQueue* buffer_queue, + struct BufferQueueItem* buffer_queue_item); +bool BufferQueueDequeue(struct BufferQueue* buffer_queue, + struct BufferQueueItem** buffer_queue_item); +void BufferQueueDestroy(struct BufferQueue* buffer_queue); + +#endif // STREAMER_BUFFER_QUEUE_H_ @@ -26,6 +26,7 @@ #include <sys/socket.h> #include <unistd.h> +#include "audio.h" #include "capture.h" #include "colorspace.h" #include "encode.h" @@ -47,6 +48,7 @@ static void OnSignal(int status) { g_signal = status; } struct Contexts { bool disable_uhid; + struct AudioContext* audio_context; struct GpuContext* gpu_context; struct IoMuxer io_muxer; int server_fd; @@ -116,6 +118,16 @@ static void MaybeDropClient(struct Contexts* contexts) { } } +static void OnAudioContextAudioReady(void* user, const void* buffer, + size_t size) { + struct Contexts* contexts = user; + LOG("Got a buffer!"); + // TODO(mburakov): Implement this! + (void)contexts; + (void)buffer; + (void)size; +} + static void OnCaptureContextFrameReady(void* user, const struct GpuFrame* captured_frame) { struct Contexts* contexts = user; @@ -191,6 +203,22 @@ drop_client: MaybeDropClient(contexts); } +static void OnAudioContextEvents(void* user) { + struct Contexts* contexts = user; + if (!IoMuxerOnRead(&contexts->io_muxer, + AudioContextGetEventsFd(contexts->audio_context), + &OnAudioContextEvents, user)) { + LOG("Failed to reschedule audio io (%s)", strerror(errno)); + g_signal = SIGABRT; + return; + } + if (!AudioContextProcessEvents(contexts->audio_context)) { + LOG("Failed to process audio events"); + g_signal = SIGABRT; + return; + } +} + static void OnCaptureContextEvents(void* user) { struct Contexts* contexts = user; if (!IoMuxerOnRead(&contexts->io_muxer, @@ -294,22 +322,42 @@ int main(int argc, char* argv[]) { contexts.disable_uhid = true; } } + + static struct AudioContextCallbacks kAudioContextCallbacks = { + .OnAudioReady = OnAudioContextAudioReady, + }; + contexts.audio_context = + AudioContextCreate(&kAudioContextCallbacks, &contexts); + if (!contexts.audio_context) { + LOG("Failed to create audio context"); + return EXIT_FAILURE; + } + contexts.gpu_context = GpuContextCreate(colorspace, range); if (!contexts.gpu_context) { LOG("Failed to create gpu context"); - return EXIT_FAILURE; + goto rollback_audio_context; } + IoMuxerCreate(&contexts.io_muxer); contexts.server_fd = CreateServerSocket(argv[1]); if (contexts.server_fd == -1) { LOG("Failed to create server socket"); goto rollback_io_muxer; } + + if (!IoMuxerOnRead(&contexts.io_muxer, + AudioContextGetEventsFd(contexts.audio_context), + &OnAudioContextEvents, &contexts)) { + LOG("Failed to schedule audio io (%s)", strerror(errno)); + goto rollback_server_fd; + } if (!IoMuxerOnRead(&contexts.io_muxer, contexts.server_fd, &OnClientConnecting, &contexts)) { LOG("Failed to schedule accept (%s)", strerror(errno)); goto rollback_server_fd; } + while (!g_signal) { if (IoMuxerIterate(&contexts.io_muxer, -1) && errno != EINTR) { LOG("Failed to iterate io muxer (%s)", strerror(errno)); @@ -327,6 +375,8 @@ rollback_server_fd: rollback_io_muxer: IoMuxerDestroy(&contexts.io_muxer); GpuContextDestroy(contexts.gpu_context); +rollback_audio_context: + AudioContextDestroy(contexts.audio_context); bool result = g_signal == SIGINT || g_signal == SIGTERM; return result ? EXIT_SUCCESS : EXIT_FAILURE; } @@ -12,6 +12,7 @@ libs:=\ gbm \ glesv2 \ libdrm \ + libpipewire-0.3 \ libva \ libva-drm |