/*
* Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
*
* receiver is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* receiver is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with receiver. If not, see .
*/
#include "atomic_queue.h"
#include
#include
#include
static size_t min(size_t a, size_t b) { return a < b ? a : b; }
static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }
bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
void* buffer = malloc(alloc);
if (!buffer) return false;
*atomic_queue = (struct AtomicQueue){
.buffer = buffer,
.alloc = alloc,
};
atomic_init(&atomic_queue->size, 0);
return true;
}
size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
size_t size) {
size_t capacity =
atomic_queue->alloc -
atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
size_t tail_size = atomic_queue->alloc - atomic_queue->write;
size_t copy_size = min3(size, capacity, tail_size);
memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
copy_size);
size_t offset = copy_size;
copy_size = min(size - copy_size, capacity - copy_size);
memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);
offset += copy_size;
atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
return offset;
}
size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
size_t size) {
size_t avail =
atomic_load_explicit(&atomic_queue->size, memory_order_acquire);
size_t tail_size = atomic_queue->alloc - atomic_queue->read;
size_t copy_size = min3(size, avail, tail_size);
memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
copy_size);
size_t offset = copy_size;
copy_size = min(size - copy_size, avail - copy_size);
memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);
offset += copy_size;
atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
return offset;
}
void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
free(atomic_queue->buffer);
}