/* * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. * * streamer is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * streamer is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with streamer. If not, see . */ #include "audio.h" #include #include #include #include #include #include #include #include #include #include "buffer_queue.h" #include "toolbox/utils.h" #define STATUS_OK 0 #define STATUS_ERR 1 struct AudioContext { const struct AudioContextCallbacks* callbacks; void* user; int waker[2]; struct BufferQueue* buffer_queue; struct pw_thread_loop* pw_thread_loop; struct pw_stream* pw_stream; }; static void WakeClient(const struct AudioContext* audio_context, char status) { if (write(audio_context->waker[1], &status, sizeof(status)) != sizeof(status)) { // TODO(mburakov): Then what?.. abort(); } } static void OnStreamStateChanged(void* data, enum pw_stream_state old, enum pw_stream_state state, const char* error) { (void)data; LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old), pw_stream_state_as_string(state), error); } static void OnStreamParamChanged(void* data, uint32_t id, const struct spa_pod* param) { struct AudioContext* audio_context = data; if (param == NULL || id != SPA_PARAM_Format) return; struct spa_audio_info audio_info; if (spa_format_parse(param, &audio_info.media_type, &audio_info.media_subtype) < 0) { LOG("Failed to parse stream format"); goto failure; } if (audio_info.media_type != SPA_MEDIA_TYPE_audio || audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) { LOG("Unexpected stream format"); goto failure; } if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) { LOG("Faield to parse audio stream format"); goto failure; } LOG("Capturing rate: %u, channels: %u", audio_info.info.raw.rate, audio_info.info.raw.channels); return; failure: pw_thread_loop_stop(audio_context->pw_thread_loop); WakeClient(audio_context, STATUS_ERR); } static void OnStreamProcess(void* data) { struct AudioContext* audio_context = data; struct pw_buffer* pw_buffer = pw_stream_dequeue_buffer(audio_context->pw_stream); if (!pw_buffer) { LOG("Failed to dequeue stream buffer"); goto failure; } for (uint32_t i = 0; i < pw_buffer->buffer->n_datas; i++) { const struct spa_data* spa_data = pw_buffer->buffer->datas + i; const void* buffer = (const uint8_t*)spa_data->data + spa_data->chunk->offset % spa_data->maxsize; uint32_t size = MIN(spa_data->chunk->size, spa_data->maxsize); struct BufferQueueItem* buffer_queue_item = BufferQueueItemCreate(buffer, size); if (!buffer_queue_item) { LOG("Failed to copy stream buffer"); goto failure; } if (!BufferQueueQueue(audio_context->buffer_queue, buffer_queue_item)) { LOG("Failed to queue stream buffer copy"); BufferQueueItemDestroy(buffer_queue_item); goto failure; } } pw_stream_queue_buffer(audio_context->pw_stream, pw_buffer); WakeClient(audio_context, STATUS_OK); return; failure: pw_thread_loop_stop(audio_context->pw_thread_loop); WakeClient(audio_context, STATUS_ERR); } struct AudioContext* AudioContextCreate( const struct AudioContextCallbacks* callbacks, void* user) { pw_init(0, NULL); struct AudioContext* audio_context = malloc(sizeof(struct AudioContext)); if (!audio_context) { LOG("Failed to allocate audio context (%s)", strerror(errno)); return NULL; } *audio_context = (struct AudioContext){ .callbacks = callbacks, .user = user, }; if (pipe(audio_context->waker)) { LOG("Failed to create pipe (%s)", strerror(errno)); goto rollback_audio_context; } audio_context->buffer_queue = BufferQueueCreate(); if (!audio_context->buffer_queue) { LOG("Failed to create buffer queue (%s)", strerror(errno)); goto rollback_waker; } audio_context->pw_thread_loop = pw_thread_loop_new("audio-capture", NULL); if (!audio_context->pw_thread_loop) { LOG("Failed to create pipewire thread loop"); goto rollback_buffer_queue; } pw_thread_loop_lock(audio_context->pw_thread_loop); int err = pw_thread_loop_start(audio_context->pw_thread_loop); if (err) { LOG("Failed to start pipewire thread loop (%s)", spa_strerror(err)); pw_thread_loop_unlock(audio_context->pw_thread_loop); goto rollback_thread_loop; } // TOOD(mburakov): Read these from the commandline? struct pw_properties* pw_properties = pw_properties_new( #define _(...) __VA_ARGS__ _(PW_KEY_AUDIO_FORMAT, "S16LE"), _(PW_KEY_AUDIO_RATE, "48000"), _(PW_KEY_AUDIO_CHANNELS, "2"), _(SPA_KEY_AUDIO_POSITION, "[ FL FR ]"), _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"), _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL #undef _ ); if (!pw_properties) { LOG("Failed to create pipewire properties"); pw_thread_loop_unlock(audio_context->pw_thread_loop); goto rollback_thread_loop; } static const struct pw_stream_events kPwStreamEvents = { .version = PW_VERSION_STREAM_EVENTS, .state_changed = OnStreamStateChanged, .param_changed = OnStreamParamChanged, .process = OnStreamProcess, }; audio_context->pw_stream = pw_stream_new_simple( pw_thread_loop_get_loop(audio_context->pw_thread_loop), "audio-capture", pw_properties, &kPwStreamEvents, audio_context); if (!audio_context->pw_stream) { LOG("Failed to create pipewire stream"); pw_thread_loop_unlock(audio_context->pw_thread_loop); goto rollback_thread_loop; } uint8_t buffer[1024]; struct spa_pod_builder spa_pod_builder = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer)); const struct spa_pod* params[] = { spa_format_audio_raw_build( &spa_pod_builder, SPA_PARAM_EnumFormat, &SPA_AUDIO_INFO_RAW_INIT(.format = SPA_AUDIO_FORMAT_S16_LE, .rate = 48000, .channels = 2, .position = {SPA_AUDIO_CHANNEL_FL, SPA_AUDIO_CHANNEL_FR})), }; static const enum pw_stream_flags kPwStreamFlags = PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS; if (pw_stream_connect(audio_context->pw_stream, PW_DIRECTION_INPUT, PW_ID_ANY, kPwStreamFlags, params, LENGTH(params))) { LOG("Failed to connect pipewire stream"); pw_stream_destroy(audio_context->pw_stream); pw_thread_loop_unlock(audio_context->pw_thread_loop); goto rollback_thread_loop; } pw_thread_loop_unlock(audio_context->pw_thread_loop); return audio_context; rollback_thread_loop: pw_thread_loop_destroy(audio_context->pw_thread_loop); rollback_buffer_queue: BufferQueueDestroy(audio_context->buffer_queue); rollback_waker: close(audio_context->waker[1]); close(audio_context->waker[0]); rollback_audio_context: free(audio_context); pw_deinit(); return NULL; } int AudioContextGetEventsFd(struct AudioContext* audio_context) { return audio_context->waker[0]; } bool AudioContextProcessEvents(struct AudioContext* audio_context) { char status; if (read(audio_context->waker[0], &status, sizeof(status)) != sizeof(status)) { // TODO(mburakov): Then what?.. abort(); } switch (status) { case STATUS_OK: break; case STATUS_ERR: LOG("Error reported from audio thread"); return false; default: __builtin_unreachable(); } for (;;) { struct BufferQueueItem* buffer_queue_item; if (!BufferQueueDequeue(audio_context->buffer_queue, &buffer_queue_item)) { LOG("Failed to dequeue stream buffer copy"); return false; } if (!buffer_queue_item) return true; audio_context->callbacks->OnAudioReady( audio_context->user, buffer_queue_item->data, buffer_queue_item->size); BufferQueueItemDestroy(buffer_queue_item); } } void AudioContextDestroy(struct AudioContext* audio_context) { pw_thread_loop_lock(audio_context->pw_thread_loop); pw_stream_destroy(audio_context->pw_stream); pw_thread_loop_unlock(audio_context->pw_thread_loop); pw_thread_loop_destroy(audio_context->pw_thread_loop); BufferQueueDestroy(audio_context->buffer_queue); close(audio_context->waker[1]); close(audio_context->waker[0]); free(audio_context); pw_deinit(); }