/*
* Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
*
* streamer is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* streamer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with streamer. If not, see .
*/
#include "audio_context.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "io_context.h"
#include "proto.h"
#include "queue.h"
#include "util.h"
struct AudioContext {
struct IoContext* io_context;
struct pw_thread_loop* thread_loop;
struct pw_stream* stream;
mtx_t mutex;
struct Queue queue;
atomic_int refcount;
};
struct ProtoImpl {
struct Proto proto;
struct ProtoHeader header;
struct AudioContext* audio_context;
uint8_t data[];
};
static struct AudioContext* AudioContextRef(
struct AudioContext* audio_context) {
atomic_fetch_add_explicit(&audio_context->refcount, 1, memory_order_relaxed);
return audio_context;
}
static void AudioContextUnref(struct AudioContext* audio_context) {
if (atomic_fetch_sub_explicit(&audio_context->refcount, 1,
memory_order_relaxed) == 1) {
for (void* item; QueuePop(&audio_context->queue, &item); free(item));
QueueDestroy(&audio_context->queue);
free(audio_context);
}
}
static void ProtoDestroy(struct Proto* proto) {
struct ProtoImpl* proto_impl = (void*)proto;
struct AudioContext* audio_context = proto_impl->audio_context;
if (mtx_lock(&audio_context->mutex) != thrd_success) {
LOG("Failed to lock mutex (%s)", strerror(errno));
goto rollback_proto_impl;
}
bool result = QueuePush(&audio_context->queue, proto_impl);
assert(mtx_unlock(&audio_context->mutex) == thrd_success);
if (!result) {
LOG("Failed to queue proto");
goto rollback_proto_impl;
}
AudioContextUnref(audio_context);
return;
rollback_proto_impl:
free(proto_impl);
AudioContextUnref(audio_context);
}
static struct ProtoImpl* AudioContextGetProto(
struct AudioContext* audio_context, size_t size) {
if (mtx_lock(&audio_context->mutex) != thrd_success) {
LOG("Failed to lock mutex (%s)", strerror(errno));
return NULL;
}
struct ProtoImpl* proto_impl = NULL;
for (void* item; QueuePop(&audio_context->queue, &item); free(item)) {
struct ProtoImpl* result = item;
if (result->header.size == size) {
proto_impl = result;
break;
}
}
assert(mtx_unlock(&audio_context->mutex) == thrd_success);
if (proto_impl) return proto_impl;
proto_impl = malloc(sizeof(struct ProtoImpl) + size);
if (!proto_impl) {
LOG("Failed to allocate proto (%s)", strerror(errno));
return NULL;
}
const struct Proto proto = {
.Destroy = ProtoDestroy,
.header = &proto_impl->header,
.data = proto_impl->data,
};
memcpy(proto_impl, &proto, sizeof(proto));
return proto_impl;
}
static void OnStreamStateChanged(void* arg, enum pw_stream_state old,
enum pw_stream_state state,
const char* error) {
(void)arg;
LOG("Stream state change %s->%s, error is %s", pw_stream_state_as_string(old),
pw_stream_state_as_string(state), error);
}
static void OnStreamParamChanged(void* arg, uint32_t id,
const struct spa_pod* param) {
(void)arg;
if (!param || id != SPA_PARAM_Format) return;
struct spa_audio_info audio_info;
if (spa_format_parse(param, &audio_info.media_type,
&audio_info.media_subtype) < 0) {
LOG("Failed to parse stream format");
return;
}
if (audio_info.media_type != SPA_MEDIA_TYPE_audio ||
audio_info.media_subtype != SPA_MEDIA_SUBTYPE_raw) {
LOG("Unexpected stream format");
return;
}
if (spa_format_audio_raw_parse(param, &audio_info.info.raw) < 0) {
LOG("Failed to parse stream raw format");
return;
}
LOG("Params changed to format=%u, rate=%u, channels=%u",
audio_info.info.raw.format, audio_info.info.raw.rate,
audio_info.info.raw.channels);
}
static void OnStreamProcess(void* arg) {
struct AudioContext* audio_context = arg;
struct pw_buffer* buffer = pw_stream_dequeue_buffer(audio_context->stream);
if (!buffer) {
LOG("Failed to dequeue stream buffer");
return;
}
struct ProtoHeader header = {
.type = kProtoTypeAudio,
.timestamp = buffer->time,
};
for (size_t index = 0; index < buffer->buffer->n_datas; index++)
header.size += buffer->buffer->datas[index].chunk->size;
struct ProtoImpl* proto_impl =
AudioContextGetProto(audio_context, header.size);
if (!proto_impl) {
LOG("Failed to get proto");
goto rollback_buffer;
}
proto_impl->header = header;
uint8_t* target = proto_impl->data;
for (size_t index = 0; index < buffer->buffer->n_datas; index++) {
const void* source = buffer->buffer->datas[index].data;
struct spa_chunk* chunk = buffer->buffer->datas[index].chunk;
memcpy(target, source + chunk->offset, chunk->size);
target += chunk->size;
}
proto_impl->audio_context = AudioContextRef(audio_context);
if (!IoContextWrite(audio_context->io_context, &proto_impl->proto)) {
LOG("Failed to write audio proto");
goto rollback_buffer;
}
rollback_buffer:
assert(!pw_stream_queue_buffer(audio_context->stream, buffer));
}
struct AudioContext* AudioContextCreate(struct IoContext* io_context,
struct Proto* proto_hello) {
assert(proto_hello->header->type == kProtoTypeHello);
struct AudioContext* audio_context = malloc(sizeof(struct AudioContext));
if (!audio_context) {
LOG("Failed to allocate audio context (%s)", strerror(errno));
goto rollback_proto_hello;
}
*audio_context = (struct AudioContext){
.io_context = io_context,
.thread_loop = pw_thread_loop_new("audio-capture", NULL),
};
if (!audio_context->thread_loop) {
LOG("Failed to create thread loop");
goto rollback_audio_context;
}
pw_thread_loop_lock(audio_context->thread_loop);
if (pw_thread_loop_start(audio_context->thread_loop)) {
LOG("Failed to start thread loop");
goto rollback_thread_loop;
}
struct pw_properties* properties = pw_properties_new(
#define _(...) __VA_ARGS__
_(PW_KEY_NODE_NAME, "streamer-sink"), _(PW_KEY_NODE_VIRTUAL, "true"),
_(PW_KEY_MEDIA_CLASS, "Audio/Sink"), NULL
#undef _
);
if (!properties) {
LOG("Failed to create properties");
goto rollback_thread_loop;
}
static const struct pw_stream_events kStreamEvents = {
.version = PW_VERSION_STREAM_EVENTS,
.state_changed = OnStreamStateChanged,
.param_changed = OnStreamParamChanged,
.process = OnStreamProcess,
};
audio_context->stream = pw_stream_new_simple(
pw_thread_loop_get_loop(audio_context->thread_loop), "audio-capture",
properties, &kStreamEvents, audio_context);
if (!audio_context->stream) {
LOG("Failed to create stream");
goto rollback_thread_loop;
}
uint8_t buffer[1024];
struct spa_pod_builder builder = {
.data = buffer,
.size = sizeof(buffer),
};
if (proto_hello->header->size != sizeof(struct spa_audio_info_raw)) {
LOG("Invalid hello proto");
goto rollback_stream;
}
const struct spa_pod* params[] = {
spa_format_audio_raw_build(&builder, SPA_PARAM_EnumFormat,
proto_hello->data),
};
static const enum pw_stream_flags kStreamFlags = (enum pw_stream_flags)(
PW_STREAM_FLAG_AUTOCONNECT | PW_STREAM_FLAG_MAP_BUFFERS |
PW_STREAM_FLAG_RT_PROCESS);
if (pw_stream_connect(audio_context->stream, PW_DIRECTION_INPUT, PW_ID_ANY,
kStreamFlags, params, LENGTH(params))) {
LOG("Failed to connect stream");
goto rollback_stream;
}
pw_thread_loop_unlock(audio_context->thread_loop);
proto_hello->Destroy(proto_hello);
return AudioContextRef(audio_context);
rollback_stream:
pw_stream_destroy(audio_context->stream);
rollback_thread_loop:
pw_thread_loop_unlock(audio_context->thread_loop);
pw_thread_loop_destroy(audio_context->thread_loop);
rollback_audio_context:
free(audio_context);
rollback_proto_hello:
proto_hello->Destroy(proto_hello);
return NULL;
}
void AudioContextDestroy(struct AudioContext* audio_context) {
pw_thread_loop_lock(audio_context->thread_loop);
pw_stream_destroy(audio_context->stream);
pw_thread_loop_unlock(audio_context->thread_loop);
pw_thread_loop_destroy(audio_context->thread_loop);
AudioContextUnref(audio_context);
}