diff options
author | Mikhail Burakov <mburakov@mailbox.org> | 2022-07-26 12:20:41 +0200 |
---|---|---|
committer | Mikhail Burakov <mburakov@mailbox.org> | 2022-07-26 12:20:41 +0200 |
commit | 5366104ae61e531fbaa7291ba44822b6b38b8b3d (patch) | |
tree | 5aedc1ec5e434365180a6421a9dc335b2bc60f58 /thread_pool.c |
Import existing toolbox components
Diffstat (limited to 'thread_pool.c')
-rw-r--r-- | thread_pool.c | 139 |
1 files changed, 139 insertions, 0 deletions
diff --git a/thread_pool.c b/thread_pool.c new file mode 100644 index 0000000..fa44ef1 --- /dev/null +++ b/thread_pool.c @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2022 Mikhail Burakov. This file is part of toolbox. + * + * toolbox is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * toolbox is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with toolbox. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "thread_pool.h" + +#include <stdatomic.h> +#include <stddef.h> +#include <stdlib.h> +#include <threads.h> + +struct ThreadPool_Task { + void (*fun)(void*); + void* user; +}; + +static _Bool FetchTask(struct ThreadPool* thread_pool, + struct ThreadPool_Task* task) { + for (size_t i = 0; i < thread_pool->tasks_count; i++) { + if (thread_pool->tasks[i].fun) { + *task = thread_pool->tasks[i]; + thread_pool->tasks[i].fun = NULL; + return 1; + } + } + return 0; +} + +static _Bool StoreTask(struct ThreadPool* thread_pool, + const struct ThreadPool_Task* task) { + for (size_t i = 0; i < thread_pool->tasks_count; i++) { + if (!thread_pool->tasks[i].fun) { + thread_pool->tasks[i] = *task; + return 1; + } + } + + size_t tasks_count = thread_pool->tasks_count + 1; + struct ThreadPool_Task* tasks = + realloc(thread_pool->tasks, tasks_count * sizeof(struct ThreadPool_Task)); + if (!tasks) return 0; + + tasks[thread_pool->tasks_count] = *task; + thread_pool->tasks = tasks; + thread_pool->tasks_count = tasks_count; + return 1; +} + +static int ThreadProc(void* arg) { + for (struct ThreadPool* thread_pool = arg; + atomic_load_explicit(&thread_pool->running, memory_order_relaxed);) { + if (mtx_lock(&thread_pool->tasks_mutex) != thrd_success) { + // TODO(mburakov): Could we do something other than just reattempt? + thrd_yield(); + continue; + } + + for (;;) { + if (!atomic_load_explicit(&thread_pool->running, memory_order_relaxed)) { + mtx_unlock(&thread_pool->tasks_mutex); + return 0; + } + struct ThreadPool_Task task; + if (FetchTask(thread_pool, &task)) { + mtx_unlock(&thread_pool->tasks_mutex); + task.fun(task.user); + break; + } + cnd_wait(&thread_pool->tasks_cond, &thread_pool->tasks_mutex); + } + } + return 0; +} + +int ThreadPool_Create(struct ThreadPool* thread_pool, size_t threads_count) { + atomic_init(&thread_pool->running, 1); + thread_pool->threads = malloc(threads_count * sizeof(thrd_t)); + if (!thread_pool->threads) return -1; + + thread_pool->threads_count = 0; + if (cnd_init(&thread_pool->tasks_cond) != thrd_success) goto rollback_threads; + if (mtx_init(&thread_pool->tasks_mutex, mtx_plain) != thrd_success) + goto rollback_tasks_cond; + + thread_pool->tasks = NULL; + thread_pool->tasks_count = 0; + for (; thread_pool->threads_count < threads_count; + thread_pool->threads_count++) { + thrd_t* thread = &thread_pool->threads[thread_pool->threads_count]; + if (thrd_create(thread, ThreadProc, thread_pool) != thrd_success) + goto rollback_running; + } + return 0; + +rollback_running: + atomic_store_explicit(&thread_pool->running, 0, memory_order_relaxed); + cnd_broadcast(&thread_pool->tasks_cond); + while (thread_pool->threads_count-- > 0) + thrd_join(thread_pool->threads[thread_pool->threads_count], NULL); + mtx_destroy(&thread_pool->tasks_mutex); +rollback_tasks_cond: + cnd_destroy(&thread_pool->tasks_cond); +rollback_threads: + free(thread_pool->threads); + return -1; +} + +int ThreadPool_Schedule(struct ThreadPool* thread_pool, void (*fun)(void*), + void* user) { + if (mtx_lock(&thread_pool->tasks_mutex) != thrd_success) return -1; + struct ThreadPool_Task task = {.fun = fun, .user = user}; + _Bool result = StoreTask(thread_pool, &task); + if (result) cnd_broadcast(&thread_pool->tasks_cond); + mtx_unlock(&thread_pool->tasks_mutex); + return result ? 0 : -1; +} + +void ThreadPool_Destroy(struct ThreadPool* thread_pool) { + atomic_store_explicit(&thread_pool->running, 0, memory_order_relaxed); + cnd_broadcast(&thread_pool->tasks_cond); + while (thread_pool->threads_count-- > 0) + thrd_join(thread_pool->threads[thread_pool->threads_count], NULL); + mtx_destroy(&thread_pool->tasks_mutex); + cnd_destroy(&thread_pool->tasks_cond); + free(thread_pool->threads); +} |