summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--atomic_queue.c81
-rw-r--r--atomic_queue.h40
-rw-r--r--audio.c131
-rw-r--r--audio.h31
-rw-r--r--main.c44
-rw-r--r--makefile1
6 files changed, 325 insertions, 3 deletions
diff --git a/atomic_queue.c b/atomic_queue.c
new file mode 100644
index 0000000..898db3e
--- /dev/null
+++ b/atomic_queue.c
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "atomic_queue.h"
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+static size_t min(size_t a, size_t b) { return a < b ? a : b; }
+static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }
+
+bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
+ void* buffer = malloc(alloc);
+ if (!buffer) return false;
+ *atomic_queue = (struct AtomicQueue){
+ .buffer = buffer,
+ .alloc = alloc,
+ };
+ atomic_init(&atomic_queue->size, 0);
+ return true;
+}
+
+size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
+ size_t size) {
+ size_t capacity =
+ atomic_queue->alloc -
+ atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+ size_t tail_size = atomic_queue->alloc - atomic_queue->write;
+ size_t copy_size = min3(size, capacity, tail_size);
+ memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
+ copy_size);
+
+ size_t offset = copy_size;
+ copy_size = min(size - copy_size, capacity - copy_size);
+ memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);
+
+ offset += copy_size;
+ atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
+ atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
+ return offset;
+}
+
+size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
+ size_t size) {
+ size_t avail =
+ atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+ size_t tail_size = atomic_queue->alloc - atomic_queue->read;
+ size_t copy_size = min3(size, avail, tail_size);
+ memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
+ copy_size);
+
+ size_t offset = copy_size;
+ copy_size = min(size - copy_size, avail - copy_size);
+ memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);
+
+ offset += copy_size;
+ atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
+ atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
+ return offset;
+}
+
+void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
+ free(atomic_queue->buffer);
+}
diff --git a/atomic_queue.h b/atomic_queue.h
new file mode 100644
index 0000000..50bb66d
--- /dev/null
+++ b/atomic_queue.h
@@ -0,0 +1,40 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef ATOMIC_QUEUE_H_
+#define ATOMIC_QUEUE_H_
+
+#include <stdatomic.h>
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AtomicQueue {
+ void* buffer;
+ size_t alloc;
+ size_t read;
+ size_t write;
+ atomic_size_t size;
+};
+
+bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc);
+size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
+ size_t size);
+size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
+ size_t size);
+void AtomicQueueDestroy(struct AtomicQueue* atomic_queue);
+
+#endif // ATOMIC_QUEUE_H_
diff --git a/audio.c b/audio.c
new file mode 100644
index 0000000..bb5ef23
--- /dev/null
+++ b/audio.c
@@ -0,0 +1,131 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "audio.h"
+
+#include <alsa/asoundlib.h>
+#include <errno.h>
+#include <stdatomic.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <string.h>
+#include <threads.h>
+
+#include "atomic_queue.h"
+#include "toolbox/utils.h"
+
+struct AudioContext {
+ const char* device;
+ atomic_bool running;
+ struct AtomicQueue queue;
+ thrd_t thread;
+};
+
+static int AudioContextThreadProc(void* arg) {
+ struct AudioContext* context = arg;
+
+ snd_pcm_t* pcm = NULL;
+ int err = snd_pcm_open(&pcm, context->device, SND_PCM_STREAM_PLAYBACK, 0);
+ if (err) {
+ LOG("Failed to open pcm (%s)", snd_strerror(err));
+ atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+ return 0;
+ }
+
+ // TODO(mburakov): Read audio configuration from the server.
+ err = snd_pcm_set_params(pcm, SND_PCM_FORMAT_S16_LE,
+ SND_PCM_ACCESS_RW_INTERLEAVED, 2, 48000, 1, 10000);
+ if (err) {
+ LOG("Failed to set pcm params (%s)", snd_strerror(err));
+ atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+ goto rollback_pcm;
+ }
+
+ while (atomic_load_explicit(&context->running, memory_order_relaxed)) {
+ // TODO(mburakov): Frame size depends on dynamic audio configuration.
+ static const unsigned frame_size = sizeof(int16_t) * 2;
+ uint8_t buffer[480 * frame_size];
+
+ size_t size = AtomicQueueRead(&context->queue, buffer, sizeof(buffer));
+ if (size < sizeof(buffer)) {
+ // LOG("Audio queue underflow!");
+ memset(buffer + size, 0, sizeof(buffer) - size);
+ }
+
+ for (snd_pcm_uframes_t offset = 0; offset < sizeof(buffer) / frame_size;) {
+ snd_pcm_sframes_t nframes =
+ snd_pcm_writei(pcm, buffer + offset * frame_size,
+ sizeof(buffer) / frame_size - offset);
+ if (nframes < 0) {
+ LOG("Failed to write pcm (%s)", snd_strerror((int)nframes));
+ atomic_store_explicit(&context->running, 0, memory_order_relaxed);
+ goto rollback_pcm;
+ }
+ offset += (snd_pcm_uframes_t)nframes;
+ }
+ }
+
+rollback_pcm:
+ snd_pcm_close(pcm);
+ return 0;
+}
+
+struct AudioContext* AudioContextCreate(const char* device) {
+ struct AudioContext* audio_context = malloc(sizeof(struct AudioContext));
+ if (!audio_context) {
+ LOG("Failed to allocate context (%s)", strerror(errno));
+ return NULL;
+ }
+
+ audio_context->device = device;
+ atomic_init(&audio_context->running, 1);
+ if (!AtomicQueueCreate(&audio_context->queue, 4800 * sizeof(int16_t) * 2)) {
+ LOG("Failed to create queue (%s)", strerror(errno));
+ goto rollback_context;
+ }
+
+ if (thrd_create(&audio_context->thread, AudioContextThreadProc,
+ audio_context) != thrd_success) {
+ LOG("Failed to create thread (%s)", strerror(errno));
+ goto rollback_queue;
+ }
+ return audio_context;
+
+rollback_queue:
+ AtomicQueueDestroy(&audio_context->queue);
+rollback_context:
+ free(audio_context);
+ return NULL;
+}
+
+bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer,
+ size_t size) {
+ if (!atomic_load_explicit(&audio_context->running, memory_order_relaxed)) {
+ LOG("Audio thread was stopped early!");
+ return false;
+ }
+ if (AtomicQueueWrite(&audio_context->queue, buffer, size) < size)
+ LOG("Audio queue overflow!");
+ return true;
+}
+
+void AudioContextDestroy(struct AudioContext* audio_context) {
+ atomic_store_explicit(&audio_context->running, 0, memory_order_relaxed);
+ thrd_join(audio_context->thread, NULL);
+ AtomicQueueDestroy(&audio_context->queue);
+ free(audio_context);
+}
diff --git a/audio.h b/audio.h
new file mode 100644
index 0000000..6755b64
--- /dev/null
+++ b/audio.h
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef RECEIVER_AUDIO_H_
+#define RECEIVER_AUDIO_H_
+
+#include <stdbool.h>
+#include <stddef.h>
+
+struct AudioContext;
+
+struct AudioContext* AudioContextCreate(const char* device);
+bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer,
+ size_t size);
+void AudioContextDestroy(struct AudioContext* audio_context);
+
+#endif // RECEIVER_AUDIO_H_
diff --git a/main.c b/main.c
index 89f6938..16fd009 100644
--- a/main.c
+++ b/main.c
@@ -30,6 +30,7 @@
#include <sys/timerfd.h>
#include <unistd.h>
+#include "audio.h"
#include "decode.h"
#include "input.h"
#include "proto.h"
@@ -49,6 +50,7 @@ struct Context {
size_t overlay_height;
struct Overlay* overlay;
struct DecodeContext* decode_context;
+ struct AudioContext* audio_context;
struct Buffer buffer;
size_t video_bitstream;
@@ -142,7 +144,8 @@ static void GetMaxOverlaySize(size_t* width, size_t* height) {
*height = 4 + 12 * 3 + 4;
}
-static struct Context* ContextCreate(int sock, bool no_input, bool stats) {
+static struct Context* ContextCreate(int sock, bool no_input, bool stats,
+ const char* audio_device) {
struct Context* context = calloc(1, sizeof(struct Context));
if (!context) {
LOG("Failed to allocate context (%s)", strerror(errno));
@@ -190,8 +193,18 @@ static struct Context* ContextCreate(int sock, bool no_input, bool stats) {
LOG("Failed to create decode context");
goto rollback_overlay;
}
+
+ if (audio_device) {
+ context->audio_context = AudioContextCreate(audio_device);
+ if (!context->audio_context) {
+ LOG("Failed to create audio context");
+ goto rollback_decode_context;
+ }
+ }
return context;
+rollback_decode_context:
+ DecodeContextDestroy(context->decode_context);
rollback_overlay:
if (context->overlay) OverlayDestroy(context->overlay);
rollback_window:
@@ -284,6 +297,17 @@ static bool HandleVideoStream(struct Context* context) {
return true;
}
+static bool HandleAudioStream(struct Context* context) {
+ const struct Proto* proto = context->buffer.data;
+ if (!context->audio_context) return true;
+ if (!AudioContextDecode(context->audio_context, proto->data, proto->size)) {
+ LOG("Failed to decode incoming audio data");
+ return false;
+ }
+
+ return true;
+}
+
static bool DemuxProtoStream(int sock, struct Context* context) {
switch (BufferAppendFrom(&context->buffer, sock)) {
case -1:
@@ -313,6 +337,11 @@ again:
return false;
}
break;
+ case PROTO_TYPE_AUDIO:
+ if (!HandleAudioStream(context)) {
+ LOG("Failed to handle audio stream");
+ return false;
+ }
}
BufferDiscard(&context->buffer, sizeof(struct Proto) + proto->size);
@@ -344,6 +373,7 @@ static bool SendPingMessage(int sock, int timer_fd, struct Context* context) {
static void ContextDestroy(struct Context* context) {
BufferDestroy(&context->buffer);
+ if (context->audio_context) AudioContextDestroy(context->audio_context);
DecodeContextDestroy(context->decode_context);
if (context->overlay) OverlayDestroy(context->overlay);
WindowDestroy(context->window);
@@ -352,7 +382,8 @@ static void ContextDestroy(struct Context* context) {
int main(int argc, char* argv[]) {
if (argc < 2) {
- LOG("Usage: %s <ip>:<port> [--no-input] [--stats]", argv[0]);
+ LOG("Usage: %s <ip>:<port> [--no-input] [--stats] [--audio <device>]",
+ argv[0]);
return EXIT_FAILURE;
}
@@ -364,15 +395,22 @@ int main(int argc, char* argv[]) {
bool no_input = false;
bool stats = false;
+ const char* audio_device = NULL;
for (int i = 2; i < argc; i++) {
if (!strcmp(argv[i], "--no-input")) {
no_input = true;
} else if (!strcmp(argv[i], "--stats")) {
stats = true;
+ } else if (!strcmp(argv[i], "--audio")) {
+ audio_device = argv[++i];
+ if (i == argc) {
+ LOG("Audio argument requires a value");
+ return EXIT_FAILURE;
+ }
}
}
- struct Context* context = ContextCreate(sock, no_input, stats);
+ struct Context* context = ContextCreate(sock, no_input, stats, audio_device);
if (!context) {
LOG("Failed to create context");
goto rollback_socket;
diff --git a/makefile b/makefile
index 2eefc20..ee4b9b1 100644
--- a/makefile
+++ b/makefile
@@ -10,6 +10,7 @@ obj+=\
toolbox/perf.o
libs:=\
+ alsa \
libva \
libva-drm \
mfx \