diff options
| author | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-05 12:51:07 +0200 | 
|---|---|---|
| committer | Mikhail Burakov <mburakov@mailbox.org> | 2024-05-19 10:15:07 +0200 | 
| commit | f593c2fd65cab3275c98c100c6b7d775e24157f9 (patch) | |
| tree | 11891ecf9584594896ccb64b8cc91dafcdee23b6 | |
| parent | f5df0d289d1a06b325b70f9e1c2083ce6080d98a (diff) | |
Initial primitive audio pipeline implementation
| -rw-r--r-- | atomic_queue.c | 81 | ||||
| -rw-r--r-- | atomic_queue.h | 40 | ||||
| -rw-r--r-- | audio.c | 131 | ||||
| -rw-r--r-- | audio.h | 31 | ||||
| -rw-r--r-- | main.c | 44 | ||||
| -rw-r--r-- | makefile | 1 | 
6 files changed, 325 insertions, 3 deletions
| diff --git a/atomic_queue.c b/atomic_queue.c new file mode 100644 index 0000000..898db3e --- /dev/null +++ b/atomic_queue.c @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver. + * + * receiver is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * receiver is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with receiver.  If not, see <https://www.gnu.org/licenses/>. + */ + +#include "atomic_queue.h" + +#include <stdint.h> +#include <stdlib.h> +#include <string.h> + +static size_t min(size_t a, size_t b) { return a < b ? a : b; } +static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); } + +bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) { +  void* buffer = malloc(alloc); +  if (!buffer) return false; +  *atomic_queue = (struct AtomicQueue){ +      .buffer = buffer, +      .alloc = alloc, +  }; +  atomic_init(&atomic_queue->size, 0); +  return true; +} + +size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer, +                        size_t size) { +  size_t capacity = +      atomic_queue->alloc - +      atomic_load_explicit(&atomic_queue->size, memory_order_acquire); + +  size_t tail_size = atomic_queue->alloc - atomic_queue->write; +  size_t copy_size = min3(size, capacity, tail_size); +  memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer, +         copy_size); + +  size_t offset = copy_size; +  copy_size = min(size - copy_size, capacity - copy_size); +  memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size); + +  offset += copy_size; +  atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc; +  atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release); +  return offset; +} + +size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer, +                       size_t size) { +  size_t avail = +      atomic_load_explicit(&atomic_queue->size, memory_order_acquire); + +  size_t tail_size = atomic_queue->alloc - atomic_queue->read; +  size_t copy_size = min3(size, avail, tail_size); +  memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read, +         copy_size); + +  size_t offset = copy_size; +  copy_size = min(size - copy_size, avail - copy_size); +  memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size); + +  offset += copy_size; +  atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc; +  atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release); +  return offset; +} + +void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) { +  free(atomic_queue->buffer); +} diff --git a/atomic_queue.h b/atomic_queue.h new file mode 100644 index 0000000..50bb66d --- /dev/null +++ b/atomic_queue.h @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver. + * + * receiver is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * receiver is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with receiver.  If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef ATOMIC_QUEUE_H_ +#define ATOMIC_QUEUE_H_ + +#include <stdatomic.h> +#include <stdbool.h> +#include <stddef.h> + +struct AtomicQueue { +  void* buffer; +  size_t alloc; +  size_t read; +  size_t write; +  atomic_size_t size; +}; + +bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc); +size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer, +                        size_t size); +size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer, +                       size_t size); +void AtomicQueueDestroy(struct AtomicQueue* atomic_queue); + +#endif  // ATOMIC_QUEUE_H_ @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver. + * + * receiver is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * receiver is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with receiver.  If not, see <https://www.gnu.org/licenses/>. + */ + +#include "audio.h" + +#include <alsa/asoundlib.h> +#include <errno.h> +#include <stdatomic.h> +#include <stddef.h> +#include <stdint.h> +#include <string.h> +#include <threads.h> + +#include "atomic_queue.h" +#include "toolbox/utils.h" + +struct AudioContext { +  const char* device; +  atomic_bool running; +  struct AtomicQueue queue; +  thrd_t thread; +}; + +static int AudioContextThreadProc(void* arg) { +  struct AudioContext* context = arg; + +  snd_pcm_t* pcm = NULL; +  int err = snd_pcm_open(&pcm, context->device, SND_PCM_STREAM_PLAYBACK, 0); +  if (err) { +    LOG("Failed to open pcm (%s)", snd_strerror(err)); +    atomic_store_explicit(&context->running, 0, memory_order_relaxed); +    return 0; +  } + +  // TODO(mburakov): Read audio configuration from the server. +  err = snd_pcm_set_params(pcm, SND_PCM_FORMAT_S16_LE, +                           SND_PCM_ACCESS_RW_INTERLEAVED, 2, 48000, 1, 10000); +  if (err) { +    LOG("Failed to set pcm params (%s)", snd_strerror(err)); +    atomic_store_explicit(&context->running, 0, memory_order_relaxed); +    goto rollback_pcm; +  } + +  while (atomic_load_explicit(&context->running, memory_order_relaxed)) { +    // TODO(mburakov): Frame size depends on dynamic audio configuration. +    static const unsigned frame_size = sizeof(int16_t) * 2; +    uint8_t buffer[480 * frame_size]; + +    size_t size = AtomicQueueRead(&context->queue, buffer, sizeof(buffer)); +    if (size < sizeof(buffer)) { +      // LOG("Audio queue underflow!"); +      memset(buffer + size, 0, sizeof(buffer) - size); +    } + +    for (snd_pcm_uframes_t offset = 0; offset < sizeof(buffer) / frame_size;) { +      snd_pcm_sframes_t nframes = +          snd_pcm_writei(pcm, buffer + offset * frame_size, +                         sizeof(buffer) / frame_size - offset); +      if (nframes < 0) { +        LOG("Failed to write pcm (%s)", snd_strerror((int)nframes)); +        atomic_store_explicit(&context->running, 0, memory_order_relaxed); +        goto rollback_pcm; +      } +      offset += (snd_pcm_uframes_t)nframes; +    } +  } + +rollback_pcm: +  snd_pcm_close(pcm); +  return 0; +} + +struct AudioContext* AudioContextCreate(const char* device) { +  struct AudioContext* audio_context = malloc(sizeof(struct AudioContext)); +  if (!audio_context) { +    LOG("Failed to allocate context (%s)", strerror(errno)); +    return NULL; +  } + +  audio_context->device = device; +  atomic_init(&audio_context->running, 1); +  if (!AtomicQueueCreate(&audio_context->queue, 4800 * sizeof(int16_t) * 2)) { +    LOG("Failed to create queue (%s)", strerror(errno)); +    goto rollback_context; +  } + +  if (thrd_create(&audio_context->thread, AudioContextThreadProc, +                  audio_context) != thrd_success) { +    LOG("Failed to create thread (%s)", strerror(errno)); +    goto rollback_queue; +  } +  return audio_context; + +rollback_queue: +  AtomicQueueDestroy(&audio_context->queue); +rollback_context: +  free(audio_context); +  return NULL; +} + +bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer, +                        size_t size) { +  if (!atomic_load_explicit(&audio_context->running, memory_order_relaxed)) { +    LOG("Audio thread was stopped early!"); +    return false; +  } +  if (AtomicQueueWrite(&audio_context->queue, buffer, size) < size) +    LOG("Audio queue overflow!"); +  return true; +} + +void AudioContextDestroy(struct AudioContext* audio_context) { +  atomic_store_explicit(&audio_context->running, 0, memory_order_relaxed); +  thrd_join(audio_context->thread, NULL); +  AtomicQueueDestroy(&audio_context->queue); +  free(audio_context); +} @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver. + * + * receiver is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * receiver is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with receiver.  If not, see <https://www.gnu.org/licenses/>. + */ + +#ifndef RECEIVER_AUDIO_H_ +#define RECEIVER_AUDIO_H_ + +#include <stdbool.h> +#include <stddef.h> + +struct AudioContext; + +struct AudioContext* AudioContextCreate(const char* device); +bool AudioContextDecode(struct AudioContext* audio_context, const void* buffer, +                        size_t size); +void AudioContextDestroy(struct AudioContext* audio_context); + +#endif  // RECEIVER_AUDIO_H_ @@ -30,6 +30,7 @@  #include <sys/timerfd.h>  #include <unistd.h> +#include "audio.h"  #include "decode.h"  #include "input.h"  #include "proto.h" @@ -49,6 +50,7 @@ struct Context {    size_t overlay_height;    struct Overlay* overlay;    struct DecodeContext* decode_context; +  struct AudioContext* audio_context;    struct Buffer buffer;    size_t video_bitstream; @@ -142,7 +144,8 @@ static void GetMaxOverlaySize(size_t* width, size_t* height) {    *height = 4 + 12 * 3 + 4;  } -static struct Context* ContextCreate(int sock, bool no_input, bool stats) { +static struct Context* ContextCreate(int sock, bool no_input, bool stats, +                                     const char* audio_device) {    struct Context* context = calloc(1, sizeof(struct Context));    if (!context) {      LOG("Failed to allocate context (%s)", strerror(errno)); @@ -190,8 +193,18 @@ static struct Context* ContextCreate(int sock, bool no_input, bool stats) {      LOG("Failed to create decode context");      goto rollback_overlay;    } + +  if (audio_device) { +    context->audio_context = AudioContextCreate(audio_device); +    if (!context->audio_context) { +      LOG("Failed to create audio context"); +      goto rollback_decode_context; +    } +  }    return context; +rollback_decode_context: +  DecodeContextDestroy(context->decode_context);  rollback_overlay:    if (context->overlay) OverlayDestroy(context->overlay);  rollback_window: @@ -284,6 +297,17 @@ static bool HandleVideoStream(struct Context* context) {    return true;  } +static bool HandleAudioStream(struct Context* context) { +  const struct Proto* proto = context->buffer.data; +  if (!context->audio_context) return true; +  if (!AudioContextDecode(context->audio_context, proto->data, proto->size)) { +    LOG("Failed to decode incoming audio data"); +    return false; +  } + +  return true; +} +  static bool DemuxProtoStream(int sock, struct Context* context) {    switch (BufferAppendFrom(&context->buffer, sock)) {      case -1: @@ -313,6 +337,11 @@ again:          return false;        }        break; +    case PROTO_TYPE_AUDIO: +      if (!HandleAudioStream(context)) { +        LOG("Failed to handle audio stream"); +        return false; +      }    }    BufferDiscard(&context->buffer, sizeof(struct Proto) + proto->size); @@ -344,6 +373,7 @@ static bool SendPingMessage(int sock, int timer_fd, struct Context* context) {  static void ContextDestroy(struct Context* context) {    BufferDestroy(&context->buffer); +  if (context->audio_context) AudioContextDestroy(context->audio_context);    DecodeContextDestroy(context->decode_context);    if (context->overlay) OverlayDestroy(context->overlay);    WindowDestroy(context->window); @@ -352,7 +382,8 @@ static void ContextDestroy(struct Context* context) {  int main(int argc, char* argv[]) {    if (argc < 2) { -    LOG("Usage: %s <ip>:<port> [--no-input] [--stats]", argv[0]); +    LOG("Usage: %s <ip>:<port> [--no-input] [--stats] [--audio <device>]", +        argv[0]);      return EXIT_FAILURE;    } @@ -364,15 +395,22 @@ int main(int argc, char* argv[]) {    bool no_input = false;    bool stats = false; +  const char* audio_device = NULL;    for (int i = 2; i < argc; i++) {      if (!strcmp(argv[i], "--no-input")) {        no_input = true;      } else if (!strcmp(argv[i], "--stats")) {        stats = true; +    } else if (!strcmp(argv[i], "--audio")) { +      audio_device = argv[++i]; +      if (i == argc) { +        LOG("Audio argument requires a value"); +        return EXIT_FAILURE; +      }      }    } -  struct Context* context = ContextCreate(sock, no_input, stats); +  struct Context* context = ContextCreate(sock, no_input, stats, audio_device);    if (!context) {      LOG("Failed to create context");      goto rollback_socket; @@ -10,6 +10,7 @@ obj+=\  	toolbox/perf.o  libs:=\ +	alsa \  	libva \  	libva-drm \  	mfx \ | 
