summaryrefslogtreecommitdiff
path: root/thread_pool.c
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2022-07-26 12:20:41 +0200
committerMikhail Burakov <mburakov@mailbox.org>2022-07-26 12:20:41 +0200
commit5366104ae61e531fbaa7291ba44822b6b38b8b3d (patch)
tree5aedc1ec5e434365180a6421a9dc335b2bc60f58 /thread_pool.c
Import existing toolbox components
Diffstat (limited to 'thread_pool.c')
-rw-r--r--thread_pool.c139
1 files changed, 139 insertions, 0 deletions
diff --git a/thread_pool.c b/thread_pool.c
new file mode 100644
index 0000000..fa44ef1
--- /dev/null
+++ b/thread_pool.c
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox.
+ *
+ * toolbox is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * toolbox is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with toolbox. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#include "thread_pool.h"
+
+#include <stdatomic.h>
+#include <stddef.h>
+#include <stdlib.h>
+#include <threads.h>
+
+struct ThreadPool_Task {
+ void (*fun)(void*);
+ void* user;
+};
+
+static _Bool FetchTask(struct ThreadPool* thread_pool,
+ struct ThreadPool_Task* task) {
+ for (size_t i = 0; i < thread_pool->tasks_count; i++) {
+ if (thread_pool->tasks[i].fun) {
+ *task = thread_pool->tasks[i];
+ thread_pool->tasks[i].fun = NULL;
+ return 1;
+ }
+ }
+ return 0;
+}
+
+static _Bool StoreTask(struct ThreadPool* thread_pool,
+ const struct ThreadPool_Task* task) {
+ for (size_t i = 0; i < thread_pool->tasks_count; i++) {
+ if (!thread_pool->tasks[i].fun) {
+ thread_pool->tasks[i] = *task;
+ return 1;
+ }
+ }
+
+ size_t tasks_count = thread_pool->tasks_count + 1;
+ struct ThreadPool_Task* tasks =
+ realloc(thread_pool->tasks, tasks_count * sizeof(struct ThreadPool_Task));
+ if (!tasks) return 0;
+
+ tasks[thread_pool->tasks_count] = *task;
+ thread_pool->tasks = tasks;
+ thread_pool->tasks_count = tasks_count;
+ return 1;
+}
+
+static int ThreadProc(void* arg) {
+ for (struct ThreadPool* thread_pool = arg;
+ atomic_load_explicit(&thread_pool->running, memory_order_relaxed);) {
+ if (mtx_lock(&thread_pool->tasks_mutex) != thrd_success) {
+ // TODO(mburakov): Could we do something other than just reattempt?
+ thrd_yield();
+ continue;
+ }
+
+ for (;;) {
+ if (!atomic_load_explicit(&thread_pool->running, memory_order_relaxed)) {
+ mtx_unlock(&thread_pool->tasks_mutex);
+ return 0;
+ }
+ struct ThreadPool_Task task;
+ if (FetchTask(thread_pool, &task)) {
+ mtx_unlock(&thread_pool->tasks_mutex);
+ task.fun(task.user);
+ break;
+ }
+ cnd_wait(&thread_pool->tasks_cond, &thread_pool->tasks_mutex);
+ }
+ }
+ return 0;
+}
+
+int ThreadPool_Create(struct ThreadPool* thread_pool, size_t threads_count) {
+ atomic_init(&thread_pool->running, 1);
+ thread_pool->threads = malloc(threads_count * sizeof(thrd_t));
+ if (!thread_pool->threads) return -1;
+
+ thread_pool->threads_count = 0;
+ if (cnd_init(&thread_pool->tasks_cond) != thrd_success) goto rollback_threads;
+ if (mtx_init(&thread_pool->tasks_mutex, mtx_plain) != thrd_success)
+ goto rollback_tasks_cond;
+
+ thread_pool->tasks = NULL;
+ thread_pool->tasks_count = 0;
+ for (; thread_pool->threads_count < threads_count;
+ thread_pool->threads_count++) {
+ thrd_t* thread = &thread_pool->threads[thread_pool->threads_count];
+ if (thrd_create(thread, ThreadProc, thread_pool) != thrd_success)
+ goto rollback_running;
+ }
+ return 0;
+
+rollback_running:
+ atomic_store_explicit(&thread_pool->running, 0, memory_order_relaxed);
+ cnd_broadcast(&thread_pool->tasks_cond);
+ while (thread_pool->threads_count-- > 0)
+ thrd_join(thread_pool->threads[thread_pool->threads_count], NULL);
+ mtx_destroy(&thread_pool->tasks_mutex);
+rollback_tasks_cond:
+ cnd_destroy(&thread_pool->tasks_cond);
+rollback_threads:
+ free(thread_pool->threads);
+ return -1;
+}
+
+int ThreadPool_Schedule(struct ThreadPool* thread_pool, void (*fun)(void*),
+ void* user) {
+ if (mtx_lock(&thread_pool->tasks_mutex) != thrd_success) return -1;
+ struct ThreadPool_Task task = {.fun = fun, .user = user};
+ _Bool result = StoreTask(thread_pool, &task);
+ if (result) cnd_broadcast(&thread_pool->tasks_cond);
+ mtx_unlock(&thread_pool->tasks_mutex);
+ return result ? 0 : -1;
+}
+
+void ThreadPool_Destroy(struct ThreadPool* thread_pool) {
+ atomic_store_explicit(&thread_pool->running, 0, memory_order_relaxed);
+ cnd_broadcast(&thread_pool->tasks_cond);
+ while (thread_pool->threads_count-- > 0)
+ thrd_join(thread_pool->threads[thread_pool->threads_count], NULL);
+ mtx_destroy(&thread_pool->tasks_mutex);
+ cnd_destroy(&thread_pool->tasks_cond);
+ free(thread_pool->threads);
+}