summaryrefslogtreecommitdiff
path: root/atomic_queue.c
blob: 898db3ee6cd88c78d157d5dadf25601d3d28d684 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
 * Copyright (C) 2024 Mikhail Burakov. This file is part of receiver.
 *
 * receiver is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * receiver is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with receiver.  If not, see <https://www.gnu.org/licenses/>.
 */

#include "atomic_queue.h"

#include <stdint.h>
#include <stdlib.h>
#include <string.h>

static size_t min(size_t a, size_t b) { return a < b ? a : b; }
static size_t min3(size_t a, size_t b, size_t c) { return min(min(a, b), c); }

bool AtomicQueueCreate(struct AtomicQueue* atomic_queue, size_t alloc) {
  void* buffer = malloc(alloc);
  if (!buffer) return false;
  *atomic_queue = (struct AtomicQueue){
      .buffer = buffer,
      .alloc = alloc,
  };
  atomic_init(&atomic_queue->size, 0);
  return true;
}

size_t AtomicQueueWrite(struct AtomicQueue* atomic_queue, const void* buffer,
                        size_t size) {
  size_t capacity =
      atomic_queue->alloc -
      atomic_load_explicit(&atomic_queue->size, memory_order_acquire);

  size_t tail_size = atomic_queue->alloc - atomic_queue->write;
  size_t copy_size = min3(size, capacity, tail_size);
  memcpy((uint8_t*)atomic_queue->buffer + atomic_queue->write, buffer,
         copy_size);

  size_t offset = copy_size;
  copy_size = min(size - copy_size, capacity - copy_size);
  memcpy(atomic_queue->buffer, (const uint8_t*)buffer + offset, copy_size);

  offset += copy_size;
  atomic_queue->write = (atomic_queue->write + offset) % atomic_queue->alloc;
  atomic_fetch_add_explicit(&atomic_queue->size, offset, memory_order_release);
  return offset;
}

size_t AtomicQueueRead(struct AtomicQueue* atomic_queue, void* buffer,
                       size_t size) {
  size_t avail =
      atomic_load_explicit(&atomic_queue->size, memory_order_acquire);

  size_t tail_size = atomic_queue->alloc - atomic_queue->read;
  size_t copy_size = min3(size, avail, tail_size);
  memcpy(buffer, (const uint8_t*)atomic_queue->buffer + atomic_queue->read,
         copy_size);

  size_t offset = copy_size;
  copy_size = min(size - copy_size, avail - copy_size);
  memcpy((uint8_t*)buffer + offset, atomic_queue->buffer, copy_size);

  offset += copy_size;
  atomic_queue->read = (atomic_queue->read + offset) % atomic_queue->alloc;
  atomic_fetch_sub_explicit(&atomic_queue->size, offset, memory_order_release);
  return offset;
}

void AtomicQueueDestroy(struct AtomicQueue* atomic_queue) {
  free(atomic_queue->buffer);
}