diff options
author | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-09 14:47:46 +0200 |
---|---|---|
committer | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-09 14:47:46 +0200 |
commit | 918682faa29197fe3b0a544906638e02785a6b17 (patch) | |
tree | b183de608318581462574f78a3e95a18b3ef0a3d /audio.c | |
parent | 7d3616cdb75194d9ec72744af6ee4c7b8276b746 (diff) |
Initial audio capturing via virtual pipewire sink
Diffstat (limited to 'audio.c')
-rw-r--r-- | audio.c | 279 |
1 files changed, 279 insertions, 0 deletions
@@ -0,0 +1,279 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "audio.h" + +#include <errno.h> +#include <pipewire/pipewire.h> +#include <spa/param/audio/raw-utils.h> +#include <spa/utils/result.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> + +#include "buffer_queue.h" +#include "toolbox/utils.h" + +#define STATUS_OK 0 +#define STATUS_ERR 1 + +struct AudioContext { + const struct AudioContextCallbacks* callbacks; + void* user; + + int waker[2]; + struct BufferQueue* buffer_queue; + struct pw_thread_loop* pw_thread_loop; + struct pw_stream* pw_stream; +}; + +static void WakeClient(const struct AudioContext* audio_context, char status) { + if (write(audio_context->waker[1], &status, sizeof(status)) != + sizeof(status)) { + // TODO(mburakov): Then what?.. + abort(); + } +} + +static void OnStreamStateChanged(void* data, enum pw_stream_state old, + enum pw_stream_state state, + const char* error) { + (void)data; + LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old), + pw_stream_state_as_string(state), error); +} + +static void OnStreamParamChanged(void* data, uint32_t id, + const struct spa_pod* param) { + struct AudioContext* audio_context = data; + if (param == NULL || id != SPA_PARAM_Format) return; + + struct spa_audio_info audio_info; + if (spa_format_parse(param, &audio_info.media_type, + &audio_info.media_subtype) < 0) { + LOG("Failed to parse stream format"); + goto failure; + } + if (audio_info.media_type != SPA_MEDIA_TYPE_audio || + audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) { + LOG("Unexpected stream format"); + goto failure; + } + if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) { + LOG("Faield to parse audio stream format"); + goto failure; + } + + LOG("Capturing rate: %u, channels: %u", audio_info.info.raw.rate, + audio_info.info.raw.channels); + return; + +failure: + pw_thread_loop_stop(audio_context->pw_thread_loop); + WakeClient(audio_context, STATUS_ERR); +} + +static void OnStreamProcess(void* data) { + struct AudioContext* audio_context = data; + struct pw_buffer* pw_buffer = + pw_stream_dequeue_buffer(audio_context->pw_stream); + if (!pw_buffer) { + LOG("Failed to dequeue stream buffer"); + goto failure; + } + + for (uint32_t i = 0; i < pw_buffer->buffer->n_datas; i++) { + const struct spa_data* spa_data = pw_buffer->buffer->datas + i; + const void* buffer = (const uint8_t*)spa_data->data + + spa_data->chunk->offset % spa_data->maxsize; + uint32_t size = MIN(spa_data->chunk->size, spa_data->maxsize); + struct BufferQueueItem* buffer_queue_item = + BufferQueueItemCreate(buffer, size); + if (!buffer_queue_item) { + LOG("Failed to copy stream buffer"); + goto failure; + } + if (!BufferQueueQueue(audio_context->buffer_queue, buffer_queue_item)) { + LOG("Failed to queue stream buffer copy"); + BufferQueueItemDestroy(buffer_queue_item); + goto failure; + } + } + + pw_stream_queue_buffer(audio_context->pw_stream, pw_buffer); + WakeClient(audio_context, STATUS_OK); + return; + +failure: + pw_thread_loop_stop(audio_context->pw_thread_loop); + WakeClient(audio_context, STATUS_ERR); +} + +struct AudioContext* AudioContextCreate( + const struct AudioContextCallbacks* callbacks, void* user) { + pw_init(0, NULL); + struct AudioContext* audio_context = malloc(sizeof(struct AudioContext)); + if (!audio_context) { + LOG("Failed to allocate audio context (%s)", strerror(errno)); + return NULL; + } + *audio_context = (struct AudioContext){ + .callbacks = callbacks, + .user = user, + }; + + if (pipe(audio_context->waker)) { + LOG("Failed to create pipe (%s)", strerror(errno)); + goto rollback_audio_context; + } + + audio_context->buffer_queue = BufferQueueCreate(); + if (!audio_context->buffer_queue) { + LOG("Failed to create buffer queue (%s)", strerror(errno)); + goto rollback_waker; + } + + audio_context->pw_thread_loop = pw_thread_loop_new("audio-capture", NULL); + if (!audio_context->pw_thread_loop) { + LOG("Failed to create pipewire thread loop"); + goto rollback_buffer_queue; + } + + pw_thread_loop_lock(audio_context->pw_thread_loop); + int err = pw_thread_loop_start(audio_context->pw_thread_loop); + if (err) { + LOG("Failed to start pipewire thread loop (%s)", spa_strerror(err)); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + // TOOD(mburakov): Read these from the commandline? + struct pw_properties* pw_properties = pw_properties_new( +#define _(...) __VA_ARGS__ + _(PW_KEY_AUDIO_FORMAT, "S16LE"), _(PW_KEY_AUDIO_RATE, "48000"), + _(PW_KEY_AUDIO_CHANNELS, "2"), _(SPA_KEY_AUDIO_POSITION, "[ FL FR ]"), + _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"), + _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL +#undef _ + ); + if (!pw_properties) { + LOG("Failed to create pipewire properties"); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + static const struct pw_stream_events kPwStreamEvents = { + .version = PW_VERSION_STREAM_EVENTS, + .state_changed = OnStreamStateChanged, + .param_changed = OnStreamParamChanged, + .process = OnStreamProcess, + }; + audio_context->pw_stream = pw_stream_new_simple( + pw_thread_loop_get_loop(audio_context->pw_thread_loop), "audio-capture", + pw_properties, &kPwStreamEvents, audio_context); + if (!audio_context->pw_stream) { + LOG("Failed to create pipewire stream"); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + uint8_t buffer[1024]; + struct spa_pod_builder spa_pod_builder = + SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); + const struct spa_pod* params[] = { + spa_format_audio_raw_build( + &spa_pod_builder, SPA_PARAM_EnumFormat, + &SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_S16_LE, + .rate = 48000, .channels = 2, + .position = {SPA_AUDIO_CHANNEL_FL, + SPA_AUDIO_CHANNEL_FR})), + }; + static const enum pw_stream_flags kPwStreamFlags = + PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | + PW_STREAM_FLAG_RT_PROCESS; + if (pw_stream_connect(audio_context->pw_stream, PW_DIRECTION_INPUT, PW_ID_ANY, + kPwStreamFlags, params, LENGTH(params))) { + LOG("Failed to connect pipewire stream"); + pw_stream_destroy(audio_context->pw_stream); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + goto rollback_thread_loop; + } + + pw_thread_loop_unlock(audio_context->pw_thread_loop); + return audio_context; + +rollback_thread_loop: + pw_thread_loop_destroy(audio_context->pw_thread_loop); +rollback_buffer_queue: + BufferQueueDestroy(audio_context->buffer_queue); +rollback_waker: + close(audio_context->waker[1]); + close(audio_context->waker[0]); +rollback_audio_context: + free(audio_context); + pw_deinit(); + return NULL; +} + +int AudioContextGetEventsFd(struct AudioContext* audio_context) { + return audio_context->waker[0]; +} + +bool AudioContextProcessEvents(struct AudioContext* audio_context) { + char status; + if (read(audio_context->waker[0], &status, sizeof(status)) != + sizeof(status)) { + // TODO(mburakov): Then what?.. + abort(); + } + + switch (status) { + case STATUS_OK: + break; + case STATUS_ERR: + LOG("Error reported from audio thread"); + return false; + default: + __builtin_unreachable(); + } + + for (;;) { + struct BufferQueueItem* buffer_queue_item; + if (!BufferQueueDequeue(audio_context->buffer_queue, &buffer_queue_item)) { + LOG("Failed to dequeue stream buffer copy"); + return false; + } + if (!buffer_queue_item) return true; + audio_context->callbacks->OnAudioReady( + audio_context->user, buffer_queue_item->data, buffer_queue_item->size); + BufferQueueItemDestroy(buffer_queue_item); + } +} + +void AudioContextDestroy(struct AudioContext* audio_context) { + pw_thread_loop_lock(audio_context->pw_thread_loop); + pw_stream_destroy(audio_context->pw_stream); + pw_thread_loop_unlock(audio_context->pw_thread_loop); + pw_thread_loop_destroy(audio_context->pw_thread_loop); + BufferQueueDestroy(audio_context->buffer_queue); + close(audio_context->waker[1]); + close(audio_context->waker[0]); + free(audio_context); + pw_deinit(); +} |