/* * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. * * streamer is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * streamer is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with streamer. If not, see . */ #include "audio_context.h" #include #include #include #include #include #include #include #include #include #include "io_context.h" #include "proto.h" #include "queue.h" #include "util.h" struct AudioContext { struct IoContext* io_context; struct pw_thread_loop* thread_loop; struct pw_stream* stream; mtx_t mutex; struct Queue queue; atomic_int refcount; }; struct ProtoImpl { struct Proto proto; struct ProtoHeader header; struct AudioContext* audio_context; uint8_t data[]; }; static struct AudioContext* AudioContextRef( struct AudioContext* audio_context) { atomic_fetch_add_explicit(&audio_context->refcount, 1, memory_order_relaxed); return audio_context; } static void AudioContextUnref(struct AudioContext* audio_context) { if (atomic_fetch_sub_explicit(&audio_context->refcount, 1, memory_order_relaxed) == 1) { for (void* item; QueuePop(&audio_context->queue, &item); free(item)); QueueDestroy(&audio_context->queue); free(audio_context); } } static void ProtoDestroy(struct Proto* proto) { struct ProtoImpl* proto_impl = (void*)proto; struct AudioContext* audio_context = proto_impl->audio_context; if (mtx_lock(&audio_context->mutex) != thrd_success) { LOG("Failed to lock mutex (%s)", strerror(errno)); goto rollback_proto_impl; } bool result = QueuePush(&audio_context->queue, proto_impl); assert(mtx_unlock(&audio_context->mutex) == thrd_success); if (!result) { LOG("Failed to queue proto"); goto rollback_proto_impl; } AudioContextUnref(audio_context); return; rollback_proto_impl: free(proto_impl); AudioContextUnref(audio_context); } static struct ProtoImpl* AudioContextGetProto( struct AudioContext* audio_context, size_t size) { if (mtx_lock(&audio_context->mutex) != thrd_success) { LOG("Failed to lock mutex (%s)", strerror(errno)); return NULL; } struct ProtoImpl* proto_impl = NULL; for (void* item; QueuePop(&audio_context->queue, &item); free(item)) { struct ProtoImpl* result = item; if (result->header.size == size) { proto_impl = result; break; } } assert(mtx_unlock(&audio_context->mutex) == thrd_success); if (proto_impl) return proto_impl; proto_impl = malloc(sizeof(struct ProtoImpl) + size); if (!proto_impl) { LOG("Failed to allocate proto (%s)", strerror(errno)); return NULL; } const struct Proto proto = { .Destroy = ProtoDestroy, .header = &proto_impl->header, .data = proto_impl->data, }; memcpy(proto_impl, &proto, sizeof(proto)); return proto_impl; } static void OnStreamStateChanged(void* arg, enum pw_stream_state old, enum pw_stream_state state, const char* error) { (void)arg; LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old), pw_stream_state_as_string(state), error); } static void OnStreamParamChanged(void* arg, uint32_t id, const struct spa_pod* param) { (void)arg; if (!param || id != SPA_PARAM_Format) return; struct spa_audio_info audio_info; if (spa_format_parse(param, &audio_info.media_type, &audio_info.media_subtype) < 0) { LOG("Failed to parse stream format"); return; } if (audio_info.media_type != SPA_MEDIA_TYPE_audio || audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) { LOG("Unexpected stream format"); return; } if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) { LOG("Failed to parse stream raw format"); return; } LOG("Params changed to format=%u, rate=%u, channels=%u", audio_info.info.raw.format, audio_info.info.raw.rate, audio_info.info.raw.channels); } static void OnStreamProcess(void* arg) { struct AudioContext* audio_context = arg; struct pw_buffer* buffer = pw_stream_dequeue_buffer(audio_context->stream); if (!buffer) { LOG("Failed to dequeue stream buffer"); return; } struct ProtoHeader header = { .type = kProtoTypeAudio, .timestamp = buffer->time, }; for (size_t index = 0; index < buffer->buffer->n_datas; index++) header.size += buffer->buffer->datas[index].chunk->size; struct ProtoImpl* proto_impl = AudioContextGetProto(audio_context, header.size); if (!proto_impl) { LOG("Failed to get proto"); goto rollback_buffer; } proto_impl->header = header; uint8_t* target = proto_impl->data; for (size_t index = 0; index < buffer->buffer->n_datas; index++) { const void* source = buffer->buffer->datas[index].data; struct spa_chunk* chunk = buffer->buffer->datas[index].chunk; memcpy(target, source + chunk->offset, chunk->size); target += chunk->size; } proto_impl->audio_context = AudioContextRef(audio_context); if (!IoContextWrite(audio_context->io_context, &proto_impl->proto)) { LOG("Failed to write audio proto"); goto rollback_buffer; } rollback_buffer: assert(!pw_stream_queue_buffer(audio_context->stream, buffer)); } struct AudioContext* AudioContextCreate(struct IoContext* io_context, struct Proto* proto_hello) { assert(proto_hello->header->type == kProtoTypeHello); struct AudioContext* audio_context = malloc(sizeof(struct AudioContext)); if (!audio_context) { LOG("Failed to allocate audio context (%s)", strerror(errno)); goto rollback_proto_hello; } *audio_context = (struct AudioContext){ .io_context = io_context, .thread_loop = pw_thread_loop_new("audio-capture", NULL), }; if (!audio_context->thread_loop) { LOG("Failed to create thread loop"); goto rollback_audio_context; } pw_thread_loop_lock(audio_context->thread_loop); if (pw_thread_loop_start(audio_context->thread_loop)) { LOG("Failed to start thread loop"); goto rollback_thread_loop; } struct pw_properties* properties = pw_properties_new( #define _(...) __VA_ARGS__ _(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"), _(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL #undef _ ); if (!properties) { LOG("Failed to create properties"); goto rollback_thread_loop; } static const struct pw_stream_events kStreamEvents = { .version = PW_VERSION_STREAM_EVENTS, .state_changed = OnStreamStateChanged, .param_changed = OnStreamParamChanged, .process = OnStreamProcess, }; audio_context->stream = pw_stream_new_simple( pw_thread_loop_get_loop(audio_context->thread_loop), "audio-capture", properties, &kStreamEvents, audio_context); if (!audio_context->stream) { LOG("Failed to create stream"); goto rollback_thread_loop; } uint8_t buffer[1024]; struct spa_pod_builder builder = { .data = buffer, .size = sizeof(buffer), }; if (proto_hello->header->size != sizeof(struct spa_audio_info_raw)) { LOG("Invalid hello proto"); goto rollback_stream; } const struct spa_pod* params[] = { spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat, proto_hello->data), }; static const enum pw_stream_flags kStreamFlags = (enum pw_stream_flags)( PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS | PW_STREAM_FLAG_RT_PROCESS); if (pw_stream_connect(audio_context->stream, PW_DIRECTION_INPUT, PW_ID_ANY, kStreamFlags, params, LENGTH(params))) { LOG("Failed to connect stream"); goto rollback_stream; } pw_thread_loop_unlock(audio_context->thread_loop); proto_hello->Destroy(proto_hello); return AudioContextRef(audio_context); rollback_stream: pw_stream_destroy(audio_context->stream); rollback_thread_loop: pw_thread_loop_unlock(audio_context->thread_loop); pw_thread_loop_destroy(audio_context->thread_loop); rollback_audio_context: free(audio_context); rollback_proto_hello: proto_hello->Destroy(proto_hello); return NULL; } void AudioContextDestroy(struct AudioContext* audio_context) { pw_thread_loop_lock(audio_context->thread_loop); pw_stream_destroy(audio_context->stream); pw_thread_loop_unlock(audio_context->thread_loop); pw_thread_loop_destroy(audio_context->thread_loop); AudioContextUnref(audio_context); }