diff options
-rw-r--r-- | encode_context.c | 149 | ||||
-rw-r--r-- | main.c | 126 | ||||
-rw-r--r-- | main.cc | 76 | ||||
-rw-r--r-- | makefile | 15 | ||||
-rw-r--r-- | util.h | 48 |
5 files changed, 263 insertions, 151 deletions
diff --git a/encode_context.c b/encode_context.c index 5efe8ca..bef2f17 100644 --- a/encode_context.c +++ b/encode_context.c @@ -19,18 +19,22 @@ #include <assert.h> #include <errno.h> +#include <stdatomic.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <threads.h> #include <va/va.h> #include <va/va_drmcommon.h> #include <va/va_wayland.h> #include "io_context.h" +#include "queue.h" #include "util.h" struct EncodeContext { struct IoContext* io_context; + atomic_bool running; size_t width; size_t height; VADisplay display; @@ -39,8 +43,38 @@ struct EncodeContext { uint32_t packed_headers; VAConfigAttribValEncHEVCFeatures hevc_features; VAConfigAttribValEncHEVCBlockSizes hevc_block_sizes; + + VAContextID context_id; + struct EncodeContextFrame** frames; + size_t frames_count; + + struct Queue input; + struct Queue reuse; + + mtx_t mutex; + cnd_t cond; + thrd_t thread; + atomic_int refcount; }; +static struct EncodeContext* EncodeContextRef( + struct EncodeContext* encode_context) { + atomic_fetch_add_explicit(&encode_context->refcount, 1, memory_order_relaxed); + return encode_context; +} + +static void EncodeContextUnref(struct EncodeContext* encode_context) { + if (atomic_fetch_sub_explicit(&encode_context->refcount, 1, + memory_order_relaxed) == 1) { + assert(vaDestroyContext(encode_context->display, + encode_context->context_id) == VA_STATUS_SUCCESS); + assert(vaDestroyConfig(encode_context->display, + encode_context->config_id) == VA_STATUS_SUCCESS); + assert(vaTerminate(encode_context->display) == VA_STATUS_SUCCESS); + free(encode_context); + } +} + static const char* VaErrorString(VAStatus error) { static const char* kVaErrorStrings[] = { "VA_STATUS_SUCCESS", @@ -165,6 +199,30 @@ static bool InitializeCodecCaps(struct EncodeContext* encode_context) { return true; } +static int EncodeContextThreadProc(void* arg) { + struct EncodeContext* encode_context = arg; + for (;;) { + if (mtx_lock(&encode_context->mutex) != thrd_success) { + LOG("Failed to lock mutex (%s)", strerror(errno)); + goto leave; + } + while ( + atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { + assert(cnd_wait(&encode_context->cond, &encode_context->mutex) == + thrd_success); + } + assert(mtx_unlock(&encode_context->mutex) == thrd_success); + + if (!atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { + goto leave; + } + } + +leave: + atomic_store_explicit(&encode_context->running, false, memory_order_relaxed); + return 0; +} + struct EncodeContext* EncodeContextCreate(struct IoContext* io_context, uint32_t width, uint32_t height, struct wl_display* display) { @@ -210,8 +268,50 @@ struct EncodeContext* EncodeContextCreate(struct IoContext* io_context, goto rollback_config_id; } - return encode_context; + // mburakov: Intel fails badly when min_cb_size value is not set to 16 and + // log2_min_luma_coding_block_size_minus3 is not set to zero. Judging from + // ffmpeg code, calculating one from another should work on other platforms, + // but I hardcoded it instead since AMD is fine with alignment on 16 anyway. + static const size_t kMinCbSize = 16; + size_t aligned_width = + (encode_context->width + kMinCbSize - 1) & ~(kMinCbSize - 1); + size_t aligned_height = + (encode_context->height + kMinCbSize - 1) & ~(kMinCbSize - 1); + status = + vaCreateContext(encode_context->display, encode_context->config_id, + (int)aligned_width, (int)aligned_height, VA_PROGRESSIVE, + NULL, 0, &encode_context->context_id); + if (status != VA_STATUS_SUCCESS) { + LOG("Failed to create va context (%s)", VaErrorString(status)); + goto rollback_config_id; + } + + if (mtx_init(&encode_context->mutex, mtx_plain) != thrd_success) { + LOG("Failed to init mutex (%s)", strerror(errno)); + goto rollback_context_id; + } + + if (cnd_init(&encode_context->cond) != thrd_success) { + LOG("Failed to init condition variable (%s)", strerror(errno)); + goto rollback_mutex; + } + + if (thrd_create(&encode_context->thread, &EncodeContextThreadProc, + io_context) != thrd_success) { + LOG("Failed to create thread (%s)", strerror(errno)); + goto rollback_cond; + } + + return EncodeContextRef(encode_context); + +rollback_cond: + cnd_destroy(&encode_context->cond); +rollback_mutex: + mtx_destroy(&encode_context->mutex); +rollback_context_id: + assert(vaDestroyContext(encode_context->display, + encode_context->context_id) == VA_STATUS_SUCCESS); rollback_config_id: assert(vaDestroyConfig(encode_context->display, encode_context->config_id) == VA_STATUS_SUCCESS); @@ -224,24 +324,49 @@ rollback_encode_context: struct EncodeContextFrame* EncodeContextDequeue( struct EncodeContext* encode_context) { - (void)encode_context; - // TODO(mburakov): Implement this! - return NULL; + if (!atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { + LOG("Encode context is not running"); + return false; + } + + if (mtx_lock(&encode_context->mutex) != thrd_success) { + LOG("Failed to lock mutex"); + return false; + } + + void* item; + if (&QueuePop(encode_context->reuse, )) + +rollback_lock: + assert(mtx_unlock(&encode_context->mutex)); + return false; } bool EncodeContextQueue(struct EncodeContext* encode_context, struct EncodeContextFrame* encode_context_frame, bool encode) { - (void)encode_context; - (void)encode_context_frame; - (void)encode; - // TODO(mburakov): Implement this! + if (mtx_lock(&encode_context->mutex) != thrd_success) { + LOG("Failed to lock mutex"); + return false; + } + + struct Queue* queue = + encode ? &encode_context->input : &encode_context->reuse; + if (!QueuePush(queue, encode_context_frame)) { + LOG("Failed to queue frame (%s)", strerror(errno)); + goto rollback_lock; + } + return true; + +rollback_lock: + assert(mtx_unlock(&encode_context->mutex)); + return false; } void EncodeContextDestroy(struct EncodeContext* encode_context) { - assert(vaDestroyConfig(encode_context->display, encode_context->config_id) == - VA_STATUS_SUCCESS); - assert(vaTerminate(encode_context->display) == VA_STATUS_SUCCESS); - free(encode_context); + atomic_store_explicit(&encode_context->running, false, memory_order_relaxed); + assert(cnd_broadcast(&encode_context->cond) == thrd_success); + assert(thrd_join(encode_context->thread, NULL) == thrd_success); + EncodeContextUnref(encode_context); } @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. - * - * streamer is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * streamer is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with streamer. If not, see <https://www.gnu.org/licenses/>. - */ - -#include <errno.h> -#include <pipewire/pipewire.h> -#include <signal.h> -#include <stdbool.h> -#include <stddef.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include "audio_context.h" -#include "io_context.h" -#include "proto.h" -#include "util.h" -#include "video_context.h" - -static volatile sig_atomic_t g_signal; -static void OnSignal(int status) { g_signal = status; } - -static bool SetupSignalHandler(int sig, void (*func)(int)) { - struct sigaction sa = { - .sa_handler = func, - }; - if (sigemptyset(&sa.sa_mask) || sigaddset(&sa.sa_mask, sig)) { - LOG("Failed to configure signal set (%s)", strerror(errno)); - return false; - } - if (sigaction(sig, &sa, NULL)) { - LOG("Failed to set signal action (%s)", strerror(errno)); - return false; - } - return true; -} - -static void HandleClientSession(struct IoContext* io_context) { - struct VideoContext* video_context = VideoContextCreate(io_context); - if (!video_context) { - LOG("Failed to create video context"); - return; - } - - struct AudioContext* audio_context = NULL; - while (!g_signal) { - struct Proto* proto = IoContextRead(io_context); - if (!proto) { - LOG("Failed to read proto"); - goto leave; - } - - switch (proto->header->type) { - case kProtoTypeHello: - if (audio_context) { - LOG("Audio reconfiguration prohibited"); - proto->Destroy(proto); - goto leave; - } - audio_context = AudioContextCreate(io_context, proto); - if (!audio_context) { - LOG("Failed to create audio context"); - goto leave; - } - break; - case kProtoTypePing: - case kProtoTypeUhid: - break; - default: - LOG("Unexpected proto received"); - proto->Destroy(proto); - goto leave; - } - } - -leave: - if (audio_context) AudioContextDestroy(audio_context); - VideoContextDestroy(video_context); -} - -int main(int argc, char* argv[]) { - pw_init(&argc, &argv); - if (argc < 2) { - LOG("Usage: streamer <port>"); - goto leave; - } - - int port = atoi(argv[1]); - if (0 >= port || port > UINT16_MAX) { - LOG("Invalid port \"%s\"", argv[1]); - goto leave; - } - - SetupSignalHandler(SIGINT, OnSignal); - SetupSignalHandler(SIGPIPE, SIG_IGN); - SetupSignalHandler(SIGTERM, OnSignal); - - while (!g_signal) { - struct IoContext* io_context = IoContextCreate((uint16_t)port); - if (!io_context) { - LOG("Failed to create io context"); - goto leave; - } - - HandleClientSession(io_context); - IoContextDestroy(io_context); - } - -leave: - pw_deinit(); - bool result = g_signal == SIGINT || g_signal == SIGTERM; - return result ? EXIT_SUCCESS : EXIT_FAILURE; -} @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer. If not, see <https://www.gnu.org/licenses/>. + */ + +#include <pipewire/pipewire.h> + +#include <csignal> +#include <cstdint> +#include <cstdlib> +#include <exception> +#include <iostream> +#include <system_error> + +#include "util.h" + +namespace { + +volatile sig_atomic_t g_signal; +void OnSignal(int status) { g_signal = status; } + +void SetupSignalHandler(int sig, void (*func)(int)) { + struct sigaction sa {}; + sa.sa_handler = func; + if (sigemptyset(&sa.sa_mask) || sigaddset(&sa.sa_mask, sig)) { + throw std::system_error(errno, std::system_category(), + FROM_HERE "Failed to configure signal set"); + } + if (sigaction(sig, &sa, NULL)) { + throw std::system_error(errno, std::system_category(), + FROM_HERE "Failed to set signal action"); + } +} + +} // namespace + +int main(int argc, char* argv[]) { + try { + if (argc < 2) { + throw std::runtime_error(FROM_HERE "Usage: streamer <port>"); + } + + int port = std::atoi(argv[1]); + if (0 >= port || port > UINT16_MAX) { + throw std::runtime_error(FROM_HERE "Invalid port number"); + } + + pw_init(&argc, &argv); + Defer defer_pw_deinit([] { pw_deinit(); }); + + SetupSignalHandler(SIGINT, OnSignal); + SetupSignalHandler(SIGPIPE, SIG_IGN); + SetupSignalHandler(SIGTERM, OnSignal); + + while (!g_signal) { + // BLAH + } + + return EXIT_SUCCESS; + } catch (const std::exception& ex) { + std::cerr << ex.what() << std::endl; + return EXIT_FAILURE; + } +} @@ -1,6 +1,6 @@ bin:=$(notdir $(shell pwd)) -src:=$(wildcard *.c) -obj:=$(src:.c=.o) +src:=$(wildcard *.cc) +obj:=$(src:.cc=.o) libs:=\ egl \ @@ -21,6 +21,9 @@ res:=\ luma.glsl \ chroma.glsl +CXXFLAGS+=\ + -std=c++20 + obj:=$(patsubst %,%.o,$(protocols)) $(obj) headers:=$(patsubst %,%.h,$(protocols)) @@ -36,15 +39,15 @@ LDFLAGS+= \ all: $(bin) $(bin): $(obj) - $(CC) $^ $(LDFLAGS) -o $@ + $(CXX) $^ $(LDFLAGS) -o $@ -%.o: %.c *.h $(res) $(headers) - $(CC) -c $< $(CFLAGS) -o $@ +%.o: %.cc *.h $(res) $(headers) + $(CXX) -c $< $(CFLAGS) $(CXXFLAGS) -o $@ %.h: $(protocols_dir)/%.xml wayland-scanner client-header $< $@ -%.c: $(protocols_dir)/%.xml +%.cc: $(protocols_dir)/%.xml wayland-scanner private-code $< $@ clean: @@ -18,15 +18,49 @@ #ifndef STREAMER_UTIL_H_ #define STREAMER_UTIL_H_ -#define MIN(a, b) ((a) < (b) ? (a) : (b)) -#define MAX(a, b) ((a) > (b) ? (a) : (b)) - -#define LENGTH(x) (sizeof(x) / sizeof *(x)) - #define STR_IMPL(x) #x #define STR(x) STR_IMPL(x) +#define FROM_HERE __FILE__ ":" STR(__LINE__) " " + +template <class T> +struct Defer { + Defer(T&& op) : op_{op} {} + ~Defer() { op_(); } + const T& op_; +}; + +template <class T> +Defer(T&&) -> Defer<T>; + +template <auto dflt, auto closer> +struct PodCloser { + struct pointer { + decltype(dflt) value_; + pointer() : value_{dflt} {} + pointer(decltype(nullptr)) : value_{dflt} {} + pointer(decltype(dflt) value) : value_{value} {} + explicit operator bool() const { return value_ != dflt; } + friend bool operator==(pointer, pointer) = default; + operator decltype(dflt)() const { return value_; } + }; + void operator()(const pointer ptr) { + if (ptr.value_ != dflt) closer(ptr.value_); + } +}; + +template <class T> +struct ClassOf; + +template <class R, class C> +struct ClassOf<R C::*> { + using T = C; +}; -#define LOG(x, ...) \ - fprintf(stderr, __FILE__ ":" STR(__LINE__) " " x "\n", ##__VA_ARGS__) +template <auto fun, class T, class... Args> +inline auto Trampoline(T* head, Args... tail) { + using C = ClassOf<decltype(fun)>::T; + auto self = static_cast<C*>(head); + return (self->*fun)(tail...); +} #endif // STREAMER_UTIL_H_ |