summaryrefslogtreecommitdiff
path: root/buffer_queue.c
blob: bd29b149087af6ebd9df6625324c1b335bbe775f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
 * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
 *
 * streamer is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * streamer is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with streamer.  If not, see <https://www.gnu.org/licenses/>.
 */

#include "buffer_queue.h"

#include <stdlib.h>
#include <string.h>
#include <threads.h>

struct BufferQueue {
  mtx_t mutex;
  struct BufferQueueItem** items;
  size_t size;
  size_t alloc;
};

struct BufferQueueItem* BufferQueueItemCreate(const void* data, size_t size) {
  struct BufferQueueItem* buffer_queue_item =
      malloc(sizeof(struct BufferQueueItem) + size);
  if (!buffer_queue_item) return NULL;
  buffer_queue_item->size = size;
  memcpy(buffer_queue_item->data, data, size);
  return buffer_queue_item;
}

void BufferQueueItemDestroy(struct BufferQueueItem* buffer_queue_item) {
  free(buffer_queue_item);
}

struct BufferQueue* BufferQueueCreate(void) {
  struct BufferQueue* buffer_queue = calloc(1, sizeof(struct BufferQueue));
  if (!buffer_queue) return false;
  if (mtx_init(&buffer_queue->mutex, mtx_plain) != thrd_success)
    goto rollback_buffer_queue;
  return buffer_queue;

rollback_buffer_queue:
  free(buffer_queue);
  return NULL;
}

bool BufferQueueQueue(struct BufferQueue* buffer_queue,
                      struct BufferQueueItem* buffer_queue_item) {
  if (!buffer_queue_item || mtx_lock(&buffer_queue->mutex) != thrd_success)
    return false;

  if (buffer_queue->size == buffer_queue->alloc) {
    size_t alloc = buffer_queue->alloc + 1;
    struct BufferQueueItem** items =
        realloc(buffer_queue->items, sizeof(struct BufferQueueItem*) * alloc);
    if (!items) {
      mtx_unlock(&buffer_queue->mutex);
      return false;
    }
    buffer_queue->items = items;
    buffer_queue->alloc = alloc;
  }

  buffer_queue->items[buffer_queue->size] = buffer_queue_item;
  buffer_queue->size++;
  mtx_unlock(&buffer_queue->mutex);
  return true;
}

bool BufferQueueDequeue(struct BufferQueue* buffer_queue,
                        struct BufferQueueItem** buffer_queue_item) {
  if (mtx_lock(&buffer_queue->mutex) != thrd_success) return false;
  if (!buffer_queue->size) {
    *buffer_queue_item = NULL;
  } else {
    buffer_queue->size--;
    *buffer_queue_item = buffer_queue->items[0];
    memmove(buffer_queue->items, buffer_queue->items + 1,
            sizeof(struct BufferQueueItem*) * buffer_queue->size);
  }
  mtx_unlock(&buffer_queue->mutex);
  return true;
}

void BufferQueueDestroy(struct BufferQueue* buffer_queue) {
  for (size_t i = 0; i < buffer_queue->size; i++)
    BufferQueueItemDestroy(buffer_queue->items[i]);
  free(buffer_queue->items);
  mtx_destroy(&buffer_queue->mutex);
  free(buffer_queue);
}