diff options
author | Mikhail Burakov <mburakov@mailbox.org> | 2023-04-09 09:57:28 +0200 |
---|---|---|
committer | Mikhail Burakov <mburakov@mailbox.org> | 2023-04-09 10:31:04 +0200 |
commit | 9c55db703b6505b1c9bd2a731a98116447d48fd3 (patch) | |
tree | 1c72f4883d3ba0adbf3185ed3a192a1a8fb49e56 | |
parent | d0c6311e9c9547a9ee10310edf0c47230e15ccbb (diff) |
Switch from stdin to sockets and use io muxer
-rw-r--r-- | capture.c | 2 | ||||
-rw-r--r-- | encode.c | 34 | ||||
-rw-r--r-- | encode.h | 5 | ||||
-rw-r--r-- | main.c | 330 | ||||
-rw-r--r-- | makefile | 2 |
5 files changed, 220 insertions, 153 deletions
@@ -189,7 +189,7 @@ rollback_crtc: } void CaptureContextDestroy(struct CaptureContext* capture_context) { - GpuFrameDestroy(capture_context->gpu_frame); + if (capture_context->gpu_frame) GpuFrameDestroy(capture_context->gpu_frame); drmClose(capture_context->drm_fd); free(capture_context); } @@ -31,7 +31,6 @@ #include <va/va_drmcommon.h> #include "gpu.h" -#include "toolbox/perf.h" #include "toolbox/utils.h" struct EncodeContext { @@ -231,17 +230,10 @@ static bool DrainPacket(const struct AVPacket* packet, int fd) { }; for (;;) { ssize_t result = writev(fd, iov, LENGTH(iov)); - switch (result) { - case -1: - if (errno == EINTR) continue; - if (errno == EPIPE) return true; - LOG("Failed to write packed (%s)", strerror(errno)); - return false; - case 0: - LOG("Output file descriptor closed"); - return false; - default: - break; + if (result < 0) { + if (errno == EINTR) continue; + LOG("Failed to write (%s)", strerror(errno)); + return false; } for (size_t i = 0; i < LENGTH(iov); i++) { size_t delta = MIN((size_t)result, iov[i].iov_len); @@ -253,11 +245,8 @@ static bool DrainPacket(const struct AVPacket* packet, int fd) { } } -bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd, - struct TimingStats* encode, - struct TimingStats* drain) { +bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd) { bool result = false; - unsigned long long before_send = MicrosNow(); if (encode_context->gpu_frame) { GpuFrameDestroy(encode_context->gpu_frame); encode_context->gpu_frame = NULL; @@ -275,20 +264,13 @@ bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd, goto rollback_packet; } - unsigned long long total_send = MicrosNow() - before_send; - unsigned long long total_receive = 0; - unsigned long long total_drain = 0; for (;;) { - unsigned long long before_receive = MicrosNow(); err = avcodec_receive_packet(encode_context->codec_context, packet); switch (err) { case 0: break; case AVERROR(EAGAIN): case AVERROR_EOF: - total_receive += MicrosNow() - before_receive; - if (encode) TimingStatsRecord(encode, total_send + total_receive); - if (drain) TimingStatsRecord(drain, total_drain); result = true; goto rollback_packet; default: @@ -297,16 +279,12 @@ bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd, } packet->stream_index = 0; - unsigned long long before_drain = MicrosNow(); bool result = DrainPacket(packet, fd); av_packet_unref(packet); if (!result) { - LOG("Failed to write full packet (%s)", strerror(errno)); + LOG("Failed to drain packet"); goto rollback_packet; } - - total_receive += before_drain - before_receive; - total_drain += MicrosNow() - before_drain; } rollback_packet: @@ -26,7 +26,6 @@ struct EncodeContext; struct GpuContext; struct GpuFrame; -struct TimingStats; struct EncodeContext* EncodeContextCreate(struct GpuContext* gpu_context, uint32_t width, uint32_t height, @@ -34,9 +33,7 @@ struct EncodeContext* EncodeContextCreate(struct GpuContext* gpu_context, enum YuvRange range); const struct GpuFrame* EncodeContextGetFrame( struct EncodeContext* encode_context); -bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd, - struct TimingStats* encode, - struct TimingStats* drain); +bool EncodeContextEncodeFrame(struct EncodeContext* encode_context, int fd); void EncodeContextDestroy(struct EncodeContext* encode_context); #endif // STREAMER_ENCODE_H_ @@ -16,17 +16,21 @@ */ #include <errno.h> +#include <netinet/in.h> #include <signal.h> +#include <stdint.h> #include <stdio.h> #include <stdlib.h> #include <string.h> +#include <sys/timerfd.h> +#include <time.h> #include <unistd.h> #include "capture.h" #include "colorspace.h" #include "encode.h" #include "gpu.h" -#include "toolbox/perf.h" +#include "toolbox/io_muxer.h" #include "toolbox/utils.h" // TODO(mburakov): Currently zwp_linux_dmabuf_v1 has no way to provide @@ -35,148 +39,236 @@ // https://gitlab.freedesktop.org/wayland/wayland-protocols/-/merge_requests/183 static const enum YuvColorspace colorspace = kItuRec601; static const enum YuvRange range = kNarrowRange; +static const int capture_period = 1000000000 / 60; static volatile sig_atomic_t g_signal; static void OnSignal(int status) { g_signal = status; } -static void GpuContextDtor(struct GpuContext** gpu_context) { - if (!*gpu_context) return; - GpuContextDestroy(*gpu_context); - *gpu_context = NULL; -} +struct Contexts { + struct IoMuxer io_muxer; + int timer_fd; + int server_fd; + struct GpuContext* gpu_context; + struct CaptureContext* capture_context; + struct EncodeContext* encode_context; + int client_fd; +}; + +static int CreateServerSocket(const char* arg) { + int port = atoi(arg); + if (0 > port || port > UINT16_MAX) { + LOG("Invalid port number argument"); + return -1; + } + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + LOG("Failed to create socket (%s)", strerror(errno)); + return -1; + } + if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int))) { + LOG("Failed to reuse socket address (%s)", strerror(errno)); + goto rollback_sock; + } + const struct sockaddr_in addr = { + .sin_family = AF_INET, + .sin_port = htons((uint16_t)port), + }; + if (bind(sock, (const struct sockaddr*)&addr, sizeof(addr))) { + LOG("Failed to bind socket (%s)", strerror(errno)); + goto rollback_sock; + } + if (listen(sock, SOMAXCONN)) { + LOG("Failed to listen socket (%s)", strerror(errno)); + goto rollback_sock; + } + return sock; -static void CaptureContextDtor(struct CaptureContext** capture_context) { - if (!*capture_context) return; - CaptureContextDestroy(*capture_context); - *capture_context = NULL; +rollback_sock: + close(sock); + return -1; } -static void EncodeContextDtor(struct EncodeContext** encode_context) { - if (!*encode_context) return; - EncodeContextDestroy(*encode_context); - *encode_context = NULL; +static void OnTimerExpire(void* user) { + struct Contexts* contexts = user; + if (!IoMuxerOnRead(&contexts->io_muxer, contexts->timer_fd, &OnTimerExpire, + user)) { + LOG("Failed to reschedule timer (%s)", strerror(errno)); + g_signal = SIGABRT; + return; + } + uint64_t expirations; + if (read(contexts->timer_fd, &expirations, sizeof(expirations)) != + sizeof(expirations)) { + LOG("Failed to read timer expirations (%s)", strerror(errno)); + g_signal = SIGABRT; + return; + } + if (contexts->client_fd == -1) { + // TODO(mburakov): Is this actually possible? + LOG("Timer expired with disconnected client"); + return; + } + + const struct GpuFrame* captured_frame = + CaptureContextGetFrame(contexts->capture_context); + if (!captured_frame) { + LOG("Failed to capture frame"); + goto drop_client; + } + + if (!contexts->encode_context) { + uint32_t width, height; + GpuFrameGetSize(captured_frame, &width, &height); + contexts->encode_context = EncodeContextCreate(contexts->gpu_context, width, + height, colorspace, range); + if (!contexts->encode_context) { + LOG("Failed to create encode context"); + goto drop_client; + } + } + + const struct GpuFrame* encoded_frame = + EncodeContextGetFrame(contexts->encode_context); + if (!encoded_frame) { + LOG("Failed to get encoded frame"); + goto drop_client; + } + if (!GpuFrameConvert(captured_frame, encoded_frame)) { + LOG("Failed to convert frame"); + goto drop_client; + } + + GpuContextSync(contexts->gpu_context); + if (!EncodeContextEncodeFrame(contexts->encode_context, + contexts->client_fd)) { + LOG("Failed to encode frame"); + goto drop_client; + } + return; + +drop_client:; + static const struct itimerspec spec = {0}; + if (timerfd_settime(contexts->timer_fd, 0, &spec, NULL)) { + LOG("Failed to disarm timer (%s)", strerror(errno)); + g_signal = SIGABRT; + } + if (contexts->encode_context) { + EncodeContextDestroy(contexts->encode_context); + contexts->encode_context = NULL; + } + if (contexts->client_fd != -1) { + close(contexts->client_fd); + contexts->client_fd = -1; + } } -static void TimingStatsLog(const struct TimingStats* timing_stats, - const char* name) { - LOG("%s min/avg/max: %llu/%llu/%llu", name, timing_stats->min, - timing_stats->sum / timing_stats->counter, timing_stats->max); +static void OnClientConnecting(void* user) { + struct Contexts* contexts = user; + if (!IoMuxerOnRead(&contexts->io_muxer, contexts->server_fd, + &OnClientConnecting, user)) { + LOG("Failed to reschedule accept (%s)", strerror(errno)); + g_signal = SIGABRT; + return; + } + int client_fd = accept(contexts->server_fd, NULL, NULL); + if (client_fd < 0) { + LOG("Failed to accept client (%s)", strerror(errno)); + g_signal = SIGABRT; + return; + } + + if (contexts->client_fd != -1) { + LOG("One client is already connected"); + close(client_fd); + return; + } + + contexts->client_fd = client_fd; + static const struct itimerspec spec = { + .it_interval.tv_nsec = capture_period, + .it_value.tv_nsec = capture_period, + }; + if (timerfd_settime(contexts->timer_fd, 0, &spec, NULL)) { + LOG("Failed to arm timer (%s)", strerror(errno)); + goto rollback_client_fd; + } + return; + +rollback_client_fd: + close(contexts->client_fd); + contexts->client_fd = -1; + return; } int main(int argc, char* argv[]) { - (void)argc; - (void)argv; - + if (argc < 2) { + LOG("Usage: %s <port>", argv[0]); + return EXIT_FAILURE; + } if (signal(SIGINT, OnSignal) == SIG_ERR || - signal(SIGPIPE, OnSignal) == SIG_ERR || + signal(SIGPIPE, SIG_IGN) == SIG_ERR || signal(SIGTERM, OnSignal) == SIG_ERR) { LOG("Failed to set signal handlers (%s)", strerror(errno)); return EXIT_FAILURE; } - struct GpuContext __attribute__((cleanup(GpuContextDtor)))* gpu_context = - GpuContextCreate(colorspace, range); - if (!gpu_context) { + struct Contexts contexts = { + .timer_fd = -1, + .server_fd = -1, + .client_fd = -1, + }; + IoMuxerCreate(&contexts.io_muxer); + contexts.server_fd = CreateServerSocket(argv[1]); + if (contexts.server_fd == -1) { + LOG("Failed to create server socket"); + goto rollback_io_muxer; + } + contexts.timer_fd = timerfd_create(CLOCK_MONOTONIC, 0); + if (contexts.timer_fd == -1) { + LOG("Failed to create timer (%s)", strerror(errno)); + goto rollback_server_fd; + } + contexts.gpu_context = GpuContextCreate(colorspace, range); + if (!contexts.gpu_context) { LOG("Failed to create gpu context"); - return EXIT_FAILURE; + goto rollback_timer_fd; } - - struct CaptureContext - __attribute__((cleanup(CaptureContextDtor)))* capture_context = - CaptureContextCreate(gpu_context); - if (!capture_context) { + contexts.capture_context = CaptureContextCreate(contexts.gpu_context); + if (!contexts.capture_context) { LOG("Failed to create capture context"); - return EXIT_FAILURE; + goto rollback_gpu_context; } - struct EncodeContext - __attribute__((cleanup(EncodeContextDtor)))* encode_context = NULL; - - struct TimingStats capture; - struct TimingStats convert; - struct TimingStats encode; - struct TimingStats drain; - struct TimingStats total; - TimingStatsReset(&capture); - TimingStatsReset(&convert); - TimingStatsReset(&encode); - TimingStatsReset(&drain); - TimingStatsReset(&total); - - unsigned long long recording_started = MicrosNow(); - static const unsigned long long delta = 1000000ull / 60ull; - for (unsigned long long next = MicrosNow() + delta; !g_signal; - next += delta) { - unsigned long long before_capture = MicrosNow(); - const struct GpuFrame* captured_frame = - CaptureContextGetFrame(capture_context); - if (!captured_frame) { - LOG("Failed to capture frame"); - return EXIT_FAILURE; - } - - if (!encode_context) { - uint32_t width, height; - GpuFrameGetSize(captured_frame, &width, &height); - encode_context = - EncodeContextCreate(gpu_context, width, height, colorspace, range); - if (!encode_context) { - LOG("Failed to create encode context"); - return EXIT_FAILURE; - } - } - - const struct GpuFrame* encoded_frame = - EncodeContextGetFrame(encode_context); - if (!encoded_frame) { - LOG("Failed to get encoded frame"); - return EXIT_FAILURE; - } - - unsigned long long before_convert = MicrosNow(); - if (!GpuFrameConvert(captured_frame, encoded_frame)) { - LOG("Failed to convert frame"); - return EXIT_FAILURE; - } - - GpuContextSync(gpu_context); - unsigned long long before_encode = MicrosNow(); - if (!EncodeContextEncodeFrame(encode_context, STDOUT_FILENO, &encode, - &drain)) { - LOG("Failed to encode frame"); - return EXIT_FAILURE; - } - - unsigned long long now = MicrosNow(); - TimingStatsRecord(&capture, before_convert - before_capture); - TimingStatsRecord(&convert, before_encode - before_convert); - TimingStatsRecord(&total, now - before_capture); - - unsigned long long period = now - recording_started; - static const unsigned long long second = 1000000; - if (period > 10 * second) { - LOG("---->8-------->8-------->8----"); - TimingStatsLog(&capture, "Capture"); - TimingStatsLog(&convert, "Convert"); - TimingStatsLog(&encode, "Encode"); - TimingStatsLog(&drain, "Drain"); - TimingStatsLog(&total, "Total"); - TimingStatsReset(&capture); - TimingStatsReset(&convert); - TimingStatsReset(&encode); - TimingStatsReset(&drain); - TimingStatsReset(&total); - recording_started = now; + if (!IoMuxerOnRead(&contexts.io_muxer, contexts.timer_fd, &OnTimerExpire, + &contexts)) { + LOG("Failed to schedule timer (%s)", strerror(errno)); + goto rollback_capture_context; + } + if (!IoMuxerOnRead(&contexts.io_muxer, contexts.server_fd, + &OnClientConnecting, &contexts)) { + LOG("Failed to schedule accept (%s)", strerror(errno)); + goto rollback_capture_context; + } + while (!g_signal) { + if (IoMuxerIterate(&contexts.io_muxer, -1) && errno != EINTR) { + LOG("Failed to iterate io muxer (%s)", strerror(errno)); + g_signal = SIGABRT; } - - now = MicrosNow(); - unsigned long long micros = now < next ? next - now : 0; - if (micros) usleep((unsigned)micros); } - if (!EncodeContextEncodeFrame(encode_context, STDOUT_FILENO, NULL, NULL)) { - LOG("Failed to drain encoder"); - return EXIT_FAILURE; - } - return EXIT_SUCCESS; + if (contexts.encode_context) EncodeContextDestroy(contexts.encode_context); + if (contexts.client_fd != -1) close(contexts.client_fd); + +rollback_capture_context: + CaptureContextDestroy(contexts.capture_context); +rollback_gpu_context: + GpuContextDestroy(contexts.gpu_context); +rollback_timer_fd: + close(contexts.timer_fd); +rollback_server_fd: + close(contexts.server_fd); +rollback_io_muxer: + IoMuxerDestroy(&contexts.io_muxer); + bool result = g_signal == SIGINT || g_signal == SIGTERM; + return result ? EXIT_SUCCESS : EXIT_FAILURE; } @@ -3,7 +3,7 @@ src:=$(shell ls *.c) obj:=$(src:.c=.o) obj+=\ - toolbox/perf.o + toolbox/io_muxer.o libs:=\ egl \ |