diff options
| -rw-r--r-- | encode_context.c | 149 | ||||
| -rw-r--r-- | main.c | 126 | ||||
| -rw-r--r-- | main.cc | 76 | ||||
| -rw-r--r-- | makefile | 15 | ||||
| -rw-r--r-- | util.h | 48 | 
5 files changed, 263 insertions, 151 deletions
| diff --git a/encode_context.c b/encode_context.c index 5efe8ca..bef2f17 100644 --- a/encode_context.c +++ b/encode_context.c @@ -19,18 +19,22 @@  #include <assert.h>  #include <errno.h> +#include <stdatomic.h>  #include <stdio.h>  #include <stdlib.h>  #include <string.h> +#include <threads.h>  #include <va/va.h>  #include <va/va_drmcommon.h>  #include <va/va_wayland.h>  #include "io_context.h" +#include "queue.h"  #include "util.h"  struct EncodeContext {    struct IoContext* io_context; +  atomic_bool running;    size_t width;    size_t height;    VADisplay display; @@ -39,8 +43,38 @@ struct EncodeContext {    uint32_t packed_headers;    VAConfigAttribValEncHEVCFeatures hevc_features;    VAConfigAttribValEncHEVCBlockSizes hevc_block_sizes; + +  VAContextID context_id; +  struct EncodeContextFrame** frames; +  size_t frames_count; + +  struct Queue input; +  struct Queue reuse; + +  mtx_t mutex; +  cnd_t cond; +  thrd_t thread; +  atomic_int refcount;  }; +static struct EncodeContext* EncodeContextRef( +    struct EncodeContext* encode_context) { +  atomic_fetch_add_explicit(&encode_context->refcount, 1, memory_order_relaxed); +  return encode_context; +} + +static void EncodeContextUnref(struct EncodeContext* encode_context) { +  if (atomic_fetch_sub_explicit(&encode_context->refcount, 1, +                                memory_order_relaxed) == 1) { +    assert(vaDestroyContext(encode_context->display, +                            encode_context->context_id) == VA_STATUS_SUCCESS); +    assert(vaDestroyConfig(encode_context->display, +                           encode_context->config_id) == VA_STATUS_SUCCESS); +    assert(vaTerminate(encode_context->display) == VA_STATUS_SUCCESS); +    free(encode_context); +  } +} +  static const char* VaErrorString(VAStatus error) {    static const char* kVaErrorStrings[] = {        "VA_STATUS_SUCCESS", @@ -165,6 +199,30 @@ static bool InitializeCodecCaps(struct EncodeContext* encode_context) {    return true;  } +static int EncodeContextThreadProc(void* arg) { +  struct EncodeContext* encode_context = arg; +  for (;;) { +    if (mtx_lock(&encode_context->mutex) != thrd_success) { +      LOG("Failed to lock mutex (%s)", strerror(errno)); +      goto leave; +    } +    while ( +        atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { +      assert(cnd_wait(&encode_context->cond, &encode_context->mutex) == +             thrd_success); +    } +    assert(mtx_unlock(&encode_context->mutex) == thrd_success); + +    if (!atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { +      goto leave; +    } +  } + +leave: +  atomic_store_explicit(&encode_context->running, false, memory_order_relaxed); +  return 0; +} +  struct EncodeContext* EncodeContextCreate(struct IoContext* io_context,                                            uint32_t width, uint32_t height,                                            struct wl_display* display) { @@ -210,8 +268,50 @@ struct EncodeContext* EncodeContextCreate(struct IoContext* io_context,      goto rollback_config_id;    } -  return encode_context; +  // mburakov: Intel fails badly when min_cb_size value is not set to 16 and +  // log2_min_luma_coding_block_size_minus3 is not set to zero. Judging from +  // ffmpeg code, calculating one from another should work on other platforms, +  // but I hardcoded it instead since AMD is fine with alignment on 16 anyway. +  static const size_t kMinCbSize = 16; +  size_t aligned_width = +      (encode_context->width + kMinCbSize - 1) & ~(kMinCbSize - 1); +  size_t aligned_height = +      (encode_context->height + kMinCbSize - 1) & ~(kMinCbSize - 1); +  status = +      vaCreateContext(encode_context->display, encode_context->config_id, +                      (int)aligned_width, (int)aligned_height, VA_PROGRESSIVE, +                      NULL, 0, &encode_context->context_id); +  if (status != VA_STATUS_SUCCESS) { +    LOG("Failed to create va context (%s)", VaErrorString(status)); +    goto rollback_config_id; +  } + +  if (mtx_init(&encode_context->mutex, mtx_plain) != thrd_success) { +    LOG("Failed to init mutex (%s)", strerror(errno)); +    goto rollback_context_id; +  } + +  if (cnd_init(&encode_context->cond) != thrd_success) { +    LOG("Failed to init condition variable (%s)", strerror(errno)); +    goto rollback_mutex; +  } + +  if (thrd_create(&encode_context->thread, &EncodeContextThreadProc, +                  io_context) != thrd_success) { +    LOG("Failed to create thread (%s)", strerror(errno)); +    goto rollback_cond; +  } + +  return EncodeContextRef(encode_context); + +rollback_cond: +  cnd_destroy(&encode_context->cond); +rollback_mutex: +  mtx_destroy(&encode_context->mutex); +rollback_context_id: +  assert(vaDestroyContext(encode_context->display, +                          encode_context->context_id) == VA_STATUS_SUCCESS);  rollback_config_id:    assert(vaDestroyConfig(encode_context->display, encode_context->config_id) ==           VA_STATUS_SUCCESS); @@ -224,24 +324,49 @@ rollback_encode_context:  struct EncodeContextFrame* EncodeContextDequeue(      struct EncodeContext* encode_context) { -  (void)encode_context; -  // TODO(mburakov): Implement this! -  return NULL; +  if (!atomic_load_explicit(&encode_context->running, memory_order_relaxed)) { +    LOG("Encode context is not running"); +    return false; +  } + +  if (mtx_lock(&encode_context->mutex) != thrd_success) { +    LOG("Failed to lock mutex"); +    return false; +  } + +  void* item; +  if (&QueuePop(encode_context->reuse, )) + +rollback_lock: +  assert(mtx_unlock(&encode_context->mutex)); +  return false;  }  bool EncodeContextQueue(struct EncodeContext* encode_context,                          struct EncodeContextFrame* encode_context_frame,                          bool encode) { -  (void)encode_context; -  (void)encode_context_frame; -  (void)encode; -  // TODO(mburakov): Implement this! +  if (mtx_lock(&encode_context->mutex) != thrd_success) { +    LOG("Failed to lock mutex"); +    return false; +  } + +  struct Queue* queue = +      encode ? &encode_context->input : &encode_context->reuse; +  if (!QueuePush(queue, encode_context_frame)) { +    LOG("Failed to queue frame (%s)", strerror(errno)); +    goto rollback_lock; +  } +    return true; + +rollback_lock: +  assert(mtx_unlock(&encode_context->mutex)); +  return false;  }  void EncodeContextDestroy(struct EncodeContext* encode_context) { -  assert(vaDestroyConfig(encode_context->display, encode_context->config_id) == -         VA_STATUS_SUCCESS); -  assert(vaTerminate(encode_context->display) == VA_STATUS_SUCCESS); -  free(encode_context); +  atomic_store_explicit(&encode_context->running, false, memory_order_relaxed); +  assert(cnd_broadcast(&encode_context->cond) == thrd_success); +  assert(thrd_join(encode_context->thread, NULL) == thrd_success); +  EncodeContextUnref(encode_context);  } @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. - * - * streamer is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * streamer is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with streamer.  If not, see <https://www.gnu.org/licenses/>. - */ - -#include <errno.h> -#include <pipewire/pipewire.h> -#include <signal.h> -#include <stdbool.h> -#include <stddef.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include "audio_context.h" -#include "io_context.h" -#include "proto.h" -#include "util.h" -#include "video_context.h" - -static volatile sig_atomic_t g_signal; -static void OnSignal(int status) { g_signal = status; } - -static bool SetupSignalHandler(int sig, void (*func)(int)) { -  struct sigaction sa = { -      .sa_handler = func, -  }; -  if (sigemptyset(&sa.sa_mask) || sigaddset(&sa.sa_mask, sig)) { -    LOG("Failed to configure signal set (%s)", strerror(errno)); -    return false; -  } -  if (sigaction(sig, &sa, NULL)) { -    LOG("Failed to set signal action (%s)", strerror(errno)); -    return false; -  } -  return true; -} - -static void HandleClientSession(struct IoContext* io_context) { -  struct VideoContext* video_context = VideoContextCreate(io_context); -  if (!video_context) { -    LOG("Failed to create video context"); -    return; -  } - -  struct AudioContext* audio_context = NULL; -  while (!g_signal) { -    struct Proto* proto = IoContextRead(io_context); -    if (!proto) { -      LOG("Failed to read proto"); -      goto leave; -    } - -    switch (proto->header->type) { -      case kProtoTypeHello: -        if (audio_context) { -          LOG("Audio reconfiguration prohibited"); -          proto->Destroy(proto); -          goto leave; -        } -        audio_context = AudioContextCreate(io_context, proto); -        if (!audio_context) { -          LOG("Failed to create audio context"); -          goto leave; -        } -        break; -      case kProtoTypePing: -      case kProtoTypeUhid: -        break; -      default: -        LOG("Unexpected proto received"); -        proto->Destroy(proto); -        goto leave; -    } -  } - -leave: -  if (audio_context) AudioContextDestroy(audio_context); -  VideoContextDestroy(video_context); -} - -int main(int argc, char* argv[]) { -  pw_init(&argc, &argv); -  if (argc < 2) { -    LOG("Usage: streamer <port>"); -    goto leave; -  } - -  int port = atoi(argv[1]); -  if (0 >= port || port > UINT16_MAX) { -    LOG("Invalid port \"%s\"", argv[1]); -    goto leave; -  } - -  SetupSignalHandler(SIGINT, OnSignal); -  SetupSignalHandler(SIGPIPE, SIG_IGN); -  SetupSignalHandler(SIGTERM, OnSignal); - -  while (!g_signal) { -    struct IoContext* io_context = IoContextCreate((uint16_t)port); -    if (!io_context) { -      LOG("Failed to create io context"); -      goto leave; -    } - -    HandleClientSession(io_context); -    IoContextDestroy(io_context); -  } - -leave: -  pw_deinit(); -  bool result = g_signal == SIGINT || g_signal == SIGTERM; -  return result ? EXIT_SUCCESS : EXIT_FAILURE; -} @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2024 Mikhail Burakov. This file is part of streamer. + * + * streamer is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * streamer is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with streamer.  If not, see <https://www.gnu.org/licenses/>. + */ + +#include <pipewire/pipewire.h> + +#include <csignal> +#include <cstdint> +#include <cstdlib> +#include <exception> +#include <iostream> +#include <system_error> + +#include "util.h" + +namespace { + +volatile sig_atomic_t g_signal; +void OnSignal(int status) { g_signal = status; } + +void SetupSignalHandler(int sig, void (*func)(int)) { +  struct sigaction sa {}; +  sa.sa_handler = func; +  if (sigemptyset(&sa.sa_mask) || sigaddset(&sa.sa_mask, sig)) { +    throw std::system_error(errno, std::system_category(), +                            FROM_HERE "Failed to configure signal set"); +  } +  if (sigaction(sig, &sa, NULL)) { +    throw std::system_error(errno, std::system_category(), +                            FROM_HERE "Failed to set signal action"); +  } +} + +}  // namespace + +int main(int argc, char* argv[]) { +  try { +    if (argc < 2) { +      throw std::runtime_error(FROM_HERE "Usage: streamer <port>"); +    } + +    int port = std::atoi(argv[1]); +    if (0 >= port || port > UINT16_MAX) { +      throw std::runtime_error(FROM_HERE "Invalid port number"); +    } + +    pw_init(&argc, &argv); +    Defer defer_pw_deinit([] { pw_deinit(); }); + +    SetupSignalHandler(SIGINT, OnSignal); +    SetupSignalHandler(SIGPIPE, SIG_IGN); +    SetupSignalHandler(SIGTERM, OnSignal); + +    while (!g_signal) { +      // BLAH +    } + +    return EXIT_SUCCESS; +  } catch (const std::exception& ex) { +    std::cerr << ex.what() << std::endl; +    return EXIT_FAILURE; +  } +} @@ -1,6 +1,6 @@  bin:=$(notdir $(shell pwd)) -src:=$(wildcard *.c) -obj:=$(src:.c=.o) +src:=$(wildcard *.cc) +obj:=$(src:.cc=.o)  libs:=\  	egl \ @@ -21,6 +21,9 @@ res:=\  	luma.glsl \  	chroma.glsl +CXXFLAGS+=\ +	-std=c++20 +  obj:=$(patsubst %,%.o,$(protocols)) $(obj)  headers:=$(patsubst %,%.h,$(protocols)) @@ -36,15 +39,15 @@ LDFLAGS+= \  all: $(bin)  $(bin): $(obj) -	$(CC) $^ $(LDFLAGS) -o $@ +	$(CXX) $^ $(LDFLAGS) -o $@ -%.o: %.c *.h $(res) $(headers) -	$(CC) -c $< $(CFLAGS) -o $@ +%.o: %.cc *.h $(res) $(headers) +	$(CXX) -c $< $(CFLAGS) $(CXXFLAGS) -o $@  %.h: $(protocols_dir)/%.xml  	wayland-scanner client-header $< $@ -%.c: $(protocols_dir)/%.xml +%.cc: $(protocols_dir)/%.xml  	wayland-scanner private-code $< $@  clean: @@ -18,15 +18,49 @@  #ifndef STREAMER_UTIL_H_  #define STREAMER_UTIL_H_ -#define MIN(a, b) ((a) < (b) ? (a) : (b)) -#define MAX(a, b) ((a) > (b) ? (a) : (b)) - -#define LENGTH(x) (sizeof(x) / sizeof *(x)) -  #define STR_IMPL(x) #x  #define STR(x) STR_IMPL(x) +#define FROM_HERE __FILE__ ":" STR(__LINE__) " " + +template <class T> +struct Defer { +  Defer(T&& op) : op_{op} {} +  ~Defer() { op_(); } +  const T& op_; +}; + +template <class T> +Defer(T&&) -> Defer<T>; + +template <auto dflt, auto closer> +struct PodCloser { +  struct pointer { +    decltype(dflt) value_; +    pointer() : value_{dflt} {} +    pointer(decltype(nullptr)) : value_{dflt} {} +    pointer(decltype(dflt) value) : value_{value} {} +    explicit operator bool() const { return value_ != dflt; } +    friend bool operator==(pointer, pointer) = default; +    operator decltype(dflt)() const { return value_; } +  }; +  void operator()(const pointer ptr) { +    if (ptr.value_ != dflt) closer(ptr.value_); +  } +}; + +template <class T> +struct ClassOf; + +template <class R, class C> +struct ClassOf<R C::*> { +  using T = C; +}; -#define LOG(x, ...) \ -  fprintf(stderr, __FILE__ ":" STR(__LINE__) " " x "\n", ##__VA_ARGS__) +template <auto fun, class T, class... Args> +inline auto Trampoline(T* head, Args... tail) { +  using C = ClassOf<decltype(fun)>::T; +  auto self = static_cast<C*>(head); +  return (self->*fun)(tail...); +}  #endif  // STREAMER_UTIL_H_ | 
