summaryrefslogtreecommitdiff
path: root/atomic_queue.c
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2024-05-05 12:51:07 +0200
committerMikhail Burakov <mburakov@mailbox.org>2024-05-19 10:15:07 +0200
commitf593c2fd65cab3275c98c100c6b7d775e24157f9 (patch)
tree11891ecf9584594896ccb64b8cc91dafcdee23b6 /atomic_queue.c
parentf5df0d289d1a06b325b70f9e1c2083ce6080d98a (diff)
Initial primitive audio pipeline implementation
Diffstat (limited to 'atomic_queue.c')
-rw-r--r--atomic_queue.c81
1 files changed, 81 insertions, 0 deletions
diff --git a/atomic_queue.c b/atomic_queue.c
new file mode 100644
index 0000000..898db3e
--- /dev/null
+++ b/atomic_queue.c
@@ -0,0 +1,81 @@
+/*
+ * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "atomic_queue.h"
+
+#include <stdint.h>
+#include <stdlib.h>
+#include <string.h>
+
+static size_t min(size_t a, size_t b) { return a < b ? a : b; }
+static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }
+
+bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
+ void* buffer = malloc(alloc);
+ if (!buffer) return false;
+ *atomic_queue = (struct AtomicQueue){
+ .buffer = buffer,
+ .alloc = alloc,
+ };
+ atomic_init(&atomic_queue->size, 0);
+ return true;
+}
+
+size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
+ size_t size) {
+ size_t capacity =
+ atomic_queue->alloc -
+ atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+ size_t tail_size = atomic_queue->alloc - atomic_queue->write;
+ size_t copy_size = min3(size, capacity, tail_size);
+ memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
+ copy_size);
+
+ size_t offset = copy_size;
+ copy_size = min(size - copy_size, capacity - copy_size);
+ memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);
+
+ offset += copy_size;
+ atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
+ atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
+ return offset;
+}
+
+size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
+ size_t size) {
+ size_t avail =
+ atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
+
+ size_t tail_size = atomic_queue->alloc - atomic_queue->read;
+ size_t copy_size = min3(size, avail, tail_size);
+ memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
+ copy_size);
+
+ size_t offset = copy_size;
+ copy_size = min(size - copy_size, avail - copy_size);
+ memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);
+
+ offset += copy_size;
+ atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
+ atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
+ return offset;
+}
+
+void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
+ free(atomic_queue->buffer);
+}