summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--decode.c47
-rw-r--r--decode.h10
-rw-r--r--main.c283
-rw-r--r--proto.h41
4 files changed, 224 insertions, 157 deletions
diff --git a/decode.c b/decode.c
index 2f86a3f..6bf49e6 100644
--- a/decode.c
+++ b/decode.c
@@ -30,7 +30,6 @@
#include <va/va_drmcommon.h>
#include "frame.h"
-#include "toolbox/buffer.h"
#include "toolbox/utils.h"
#include "window.h"
@@ -48,11 +47,7 @@ struct DecodeContext {
int drm_fd;
VADisplay va_display;
mfxSession mfx_session;
-
- struct Buffer buffer;
struct Surface** surfaces;
-
- size_t bitrate;
};
static const char* VaStatusString(VAStatus status) {
@@ -411,35 +406,14 @@ static size_t UnlockAllSurfaces(struct DecodeContext* decode_context,
return result;
}
-bool DecodeContextDecode(struct DecodeContext* decode_context, int fd) {
- switch (BufferAppendFrom(&decode_context->buffer, fd)) {
- case -1:
- LOG("Failed to append packet data to buffer (%s)", strerror(errno));
- return false;
- case 0:
- LOG("Server closed connection");
- return false;
- default:
- break;
- }
-
-again:
- if (decode_context->buffer.size < sizeof(uint32_t)) {
- // mburakov: Packet size is not yet available.
- return true;
- }
- uint32_t packet_size = *(uint32_t*)decode_context->buffer.data;
- if (decode_context->buffer.size < sizeof(uint32_t) + packet_size) {
- // mburakov: Full packet is not yet available.
- return true;
- }
-
+bool DecodeContextDecode(struct DecodeContext* decode_context,
+ const void* buffer, size_t size) {
mfxBitstream bitstream = {
.DecodeTimeStamp = MFX_TIMESTAMP_UNKNOWN,
.TimeStamp = (mfxU64)MFX_TIMESTAMP_UNKNOWN,
- .Data = (mfxU8*)decode_context->buffer.data + sizeof(uint32_t),
- .DataLength = packet_size,
- .MaxLength = packet_size,
+ .Data = (void*)(ptrdiff_t)buffer,
+ .DataLength = (mfxU32)size,
+ .MaxLength = (mfxU32)size,
.DataFlag = MFX_BITSTREAM_COMPLETE_FRAME,
};
@@ -493,20 +467,11 @@ again:
return false;
}
- BufferDiscard(&decode_context->buffer, sizeof(uint32_t) + packet_size);
- decode_context->bitrate += (sizeof(uint32_t) + packet_size) * 8;
- goto again;
+ return true;
}
}
-void DecodeContextGetStats(struct DecodeContext* decode_context,
- struct DecodeStats* decode_stats) {
- decode_stats->bitrate = decode_context->bitrate;
- decode_context->bitrate = 0;
-}
-
void DecodeContextDestroy(struct DecodeContext* decode_context) {
- BufferDestroy(&decode_context->buffer);
MFXClose(decode_context->mfx_session);
vaTerminate(decode_context->va_display);
close(decode_context->drm_fd);
diff --git a/decode.h b/decode.h
index 730e926..e2ea3ba 100644
--- a/decode.h
+++ b/decode.h
@@ -22,17 +22,11 @@
#include <stddef.h>
struct DecodeContext;
-struct Frame;
struct Window;
-struct DecodeStats {
- size_t bitrate;
-};
-
struct DecodeContext* DecodeContextCreate(struct Window* window);
-bool DecodeContextDecode(struct DecodeContext* decode_context, int fd);
-void DecodeContextGetStats(struct DecodeContext* decode_context,
- struct DecodeStats* decode_stats);
+bool DecodeContextDecode(struct DecodeContext* decode_context,
+ const void* buffer, size_t size);
void DecodeContextDestroy(struct DecodeContext* decode_context);
#endif // RECEIVER_DECODE_H_
diff --git a/main.c b/main.c
index 7e57929..1252566 100644
--- a/main.c
+++ b/main.c
@@ -31,20 +31,28 @@
#include "decode.h"
#include "input.h"
+#include "proto.h"
#include "pui/font.h"
+#include "toolbox/buffer.h"
#include "toolbox/perf.h"
#include "toolbox/utils.h"
#include "window.h"
static volatile sig_atomic_t g_signal;
static void OnSignal(int status) { g_signal = status; }
-static size_t overlay_width, overlay_height;
-static void SocketDtor(int* sock) {
- if (*sock == -1) return;
- close(*sock);
- *sock = -1;
-}
+struct Context {
+ struct InputStream* input_stream;
+ struct Window* window;
+ size_t overlay_width;
+ size_t overlay_height;
+ struct Overlay* overlay;
+ struct DecodeContext* decode_context;
+ struct Buffer buffer;
+
+ size_t video_bitstream;
+ uint64_t timestamp;
+};
static int ConnectSocket(const char* arg) {
uint16_t port;
@@ -122,30 +130,6 @@ static void OnWindowWheel(void* user, int delta) {
}
}
-static void InputStreamDtor(struct InputStream** input_stream) {
- if (!*input_stream) return;
- InputStreamDestroy(*input_stream);
- *input_stream = NULL;
-}
-
-static void WindowDtor(struct Window** window) {
- if (!*window) return;
- WindowDestroy(*window);
- *window = NULL;
-}
-
-static void OverlayDtor(struct Overlay** overlay) {
- if (!*overlay) return;
- OverlayDestroy(*overlay);
- *overlay = NULL;
-}
-
-static void DecodeContextDtor(struct DecodeContext** decode_context) {
- if (!*decode_context) return;
- DecodeContextDestroy(*decode_context);
- *decode_context = NULL;
-}
-
static void GetMaxOverlaySize(size_t* width, size_t* height) {
char str[64];
snprintf(str, sizeof(str), "Bitrate: %zu.000 Mbps", SIZE_MAX / 1000);
@@ -153,29 +137,153 @@ static void GetMaxOverlaySize(size_t* width, size_t* height) {
*height = 20;
}
-static void RenderOverlay(struct Overlay* overlay, uint64_t clock_delta,
- const struct DecodeStats* decode_stats) {
- uint32_t* buffer = OverlayLock(overlay);
+static struct Context* ContextCreate(int sock, bool no_input, bool stats) {
+ struct Context* context = calloc(1, sizeof(struct Context));
+ if (!context) {
+ LOG("Failed to allocate context (%s)", strerror(errno));
+ return NULL;
+ }
+
+ const struct WindowEventHandlers* maybe_window_event_handlers = NULL;
+ if (!no_input) {
+ context->input_stream = InputStreamCreate(sock);
+ if (!context->input_stream) {
+ LOG("Failed to create input stream");
+ goto rollback_context;
+ }
+ static const struct WindowEventHandlers window_event_handlers = {
+ .OnClose = OnWindowClose,
+ .OnFocus = OnWindowFocus,
+ .OnKey = OnWindowKey,
+ .OnMove = OnWindowMove,
+ .OnButton = OnWindowButton,
+ .OnWheel = OnWindowWheel,
+ };
+ maybe_window_event_handlers = &window_event_handlers;
+ }
+
+ context->window =
+ WindowCreate(maybe_window_event_handlers, context->input_stream);
+ if (!context->window) {
+ LOG("Failed to create window");
+ goto rollback_input_stream;
+ }
+
+ if (stats) {
+ GetMaxOverlaySize(&context->overlay_width, &context->overlay_height);
+ context->overlay =
+ OverlayCreate(context->window, 4, 4, (int)context->overlay_width,
+ (int)context->overlay_height);
+ if (!context->overlay) {
+ LOG("Failed to create stats overlay");
+ goto rollback_window;
+ }
+ }
+
+ context->decode_context = DecodeContextCreate(context->window);
+ if (!context->decode_context) {
+ LOG("Failed to create decode context");
+ goto rollback_overlay;
+ }
+ return context;
+
+rollback_overlay:
+ if (context->overlay) OverlayDestroy(context->overlay);
+rollback_window:
+ WindowDestroy(context->window);
+rollback_input_stream:
+ if (context->input_stream) InputStreamDestroy(context->input_stream);
+rollback_context:
+ free(context);
+ return NULL;
+}
+
+static bool RenderOverlay(struct Context* context, uint64_t timestamp) {
+ uint32_t* buffer = OverlayLock(context->overlay);
if (!buffer) {
LOG("Failed to lock overlay");
- return;
+ return false;
}
char str[64];
- size_t bitrate = decode_stats->bitrate * 1000000 / clock_delta / 1024;
+ uint64_t clock_delta = timestamp - context->timestamp;
+ size_t bitrate = context->video_bitstream * 1000000 / clock_delta / 1024;
snprintf(str, sizeof(str), "Bitrate: %zu.%03zu Mbps", bitrate / 1000,
bitrate % 1000);
- size_t width = PuiStringWidth(str) + 8;
- memset(buffer, 0, overlay_width * overlay_height * 4);
- for (size_t y = 0; y < overlay_height; y++) {
- for (size_t x = 0; x < width; x++)
- buffer[x + y * overlay_width] = 0x40000000;
+ size_t overlay_width = PuiStringWidth(str) + 8;
+ memset(buffer, 0, context->overlay_width * context->overlay_height * 4);
+ for (size_t y = 0; y < context->overlay_height; y++) {
+ for (size_t x = 0; x < overlay_width; x++)
+ buffer[x + y * context->overlay_width] = 0x40000000;
}
- size_t voffset = overlay_width * 4;
- PuiStringRender(str, buffer + voffset + 4, overlay_width, 0xffffffff);
- OverlayUnlock(overlay);
+ size_t voffset = context->overlay_width * 4;
+ PuiStringRender(str, buffer + voffset + 4, context->overlay_width,
+ 0xffffffff);
+ OverlayUnlock(context->overlay);
+ return true;
+}
+
+static bool HandleVideoStream(struct Context* context) {
+ const struct Proto* proto = context->buffer.data;
+ if (!DecodeContextDecode(context->decode_context, proto->data, proto->size)) {
+ LOG("Failed to decode incoming video data");
+ return false;
+ }
+
+ if (!context->overlay) return true;
+ if (!context->timestamp) {
+ context->timestamp = MicrosNow();
+ return true;
+ }
+
+ context->video_bitstream += proto->size;
+ if (!(proto->flags & PROTO_FLAG_KEYFRAME)) return true;
+
+ uint64_t timestamp = MicrosNow();
+ if (!RenderOverlay(context, timestamp)) LOG("Failed to render overlay");
+ context->video_bitstream = 0;
+ context->timestamp = timestamp;
+ return true;
+}
+
+static bool DemuxProtoStream(int sock, struct Context* context) {
+ switch (BufferAppendFrom(&context->buffer, sock)) {
+ case -1:
+ LOG("Failed to append packet data to buffer (%s)", strerror(errno));
+ return false;
+ case 0:
+ LOG("Server closed connection");
+ return false;
+ default:
+ break;
+ }
+
+again:
+ if (context->buffer.size < sizeof(struct Proto)) return true;
+ const struct Proto* proto = context->buffer.data;
+ if (context->buffer.size < sizeof(struct Proto) + proto->size) return true;
+
+ switch (proto->type) {
+ case PROTO_TYPE_VIDEO:
+ if (!HandleVideoStream(context)) {
+ LOG("Failed to handle video stream");
+ return false;
+ }
+ break;
+ }
+
+ BufferDiscard(&context->buffer, sizeof(struct Proto) + proto->size);
+ goto again;
+}
+
+static void ContextDestroy(struct Context* context) {
+ BufferDestroy(&context->buffer);
+ DecodeContextDestroy(context->decode_context);
+ if (context->overlay) OverlayDestroy(context->overlay);
+ WindowDestroy(context->window);
+ if (context->input_stream) InputStreamDestroy(context->input_stream);
}
int main(int argc, char* argv[]) {
@@ -183,8 +291,12 @@ int main(int argc, char* argv[]) {
LOG("Usage: %s <ip>:<port> [--no-input] [--stats]", argv[0]);
return EXIT_FAILURE;
}
- int __attribute__((cleanup(SocketDtor))) sock = ConnectSocket(argv[1]);
- if (sock == -1) return EXIT_FAILURE;
+
+ int sock = ConnectSocket(argv[1]);
+ if (sock == -1) {
+ LOG("Failed to connect socket");
+ return EXIT_FAILURE;
+ }
bool no_input = false;
bool stats = false;
@@ -195,55 +307,17 @@ int main(int argc, char* argv[]) {
stats = true;
}
}
- struct InputStream
- __attribute__((cleanup(InputStreamDtor)))* maybe_input_stream = NULL;
- const struct WindowEventHandlers* maybe_window_event_handlers = NULL;
- if (!no_input) {
- maybe_input_stream = InputStreamCreate(sock);
- if (!maybe_input_stream) {
- LOG("Failed to create input stream");
- return EXIT_FAILURE;
- }
- static const struct WindowEventHandlers window_event_handlers = {
- .OnClose = OnWindowClose,
- .OnFocus = OnWindowFocus,
- .OnKey = OnWindowKey,
- .OnMove = OnWindowMove,
- .OnButton = OnWindowButton,
- .OnWheel = OnWindowWheel,
- };
- maybe_window_event_handlers = &window_event_handlers;
- }
- struct Window __attribute__((cleanup(WindowDtor)))* window =
- WindowCreate(maybe_window_event_handlers, maybe_input_stream);
- if (!window) {
- LOG("Failed to create window");
- return EXIT_FAILURE;
- }
- struct Overlay __attribute__((cleanup(OverlayDtor)))* overlay = NULL;
- if (stats) {
- GetMaxOverlaySize(&overlay_width, &overlay_height);
- overlay =
- OverlayCreate(window, 4, 4, (int)overlay_width, (int)overlay_height);
- if (!overlay) {
- LOG("Failed to create overlay");
- return EXIT_FAILURE;
- }
- }
-
- struct DecodeContext
- __attribute__((cleanup(DecodeContextDtor)))* decode_context =
- DecodeContextCreate(window);
- if (!decode_context) {
- LOG("Failed to create decode context");
- return EXIT_FAILURE;
+ struct Context* context = ContextCreate(sock, no_input, stats);
+ if (!context) {
+ LOG("Failed to create context");
+ goto rollback_socket;
}
- int events_fd = WindowGetEventsFd(window);
+ int events_fd = WindowGetEventsFd(context->window);
if (events_fd == -1) {
LOG("Failed to get events fd");
- return EXIT_FAILURE;
+ goto rollback_context;
}
if (signal(SIGINT, OnSignal) == SIG_ERR ||
@@ -252,7 +326,6 @@ int main(int argc, char* argv[]) {
return EXIT_FAILURE;
}
- uint64_t before = MicrosNow();
while (!g_signal) {
struct pollfd pfds[] = {
{.fd = sock, .events = POLLIN},
@@ -262,7 +335,7 @@ int main(int argc, char* argv[]) {
case -1:
if (errno != EINTR) {
LOG("Failed to poll (%s)", strerror(errno));
- return EXIT_FAILURE;
+ goto rollback_context;
}
__attribute__((fallthrough));
case 0:
@@ -270,26 +343,20 @@ int main(int argc, char* argv[]) {
default:
break;
}
- if (overlay) {
- uint64_t after = MicrosNow();
- static const uint64_t timeout = 5 * 1000 * 1000;
- uint64_t time_delta = after - before;
- if (time_delta > timeout) {
- struct DecodeStats decode_stats;
- DecodeContextGetStats(decode_context, &decode_stats);
- RenderOverlay(overlay, time_delta, &decode_stats);
- before = after;
- }
+ if (pfds[0].revents && !DemuxProtoStream(sock, context)) {
+ LOG("Failed to demux proto stream");
+ goto rollback_context;
}
- if (pfds[0].revents && !DecodeContextDecode(decode_context, sock)) {
- LOG("Failed to decode incoming data");
- return EXIT_FAILURE;
- }
- if (pfds[1].revents && !WindowProcessEvents(window)) {
+ if (pfds[1].revents && !WindowProcessEvents(context->window)) {
LOG("Failed to process window events");
- return EXIT_FAILURE;
+ goto rollback_context;
}
}
- return EXIT_SUCCESS;
+rollback_context:
+ ContextDestroy(context);
+rollback_socket:
+ close(sock);
+ bool result = g_signal == SIGINT || g_signal == SIGTERM;
+ return result ? EXIT_SUCCESS : EXIT_FAILURE;
}
diff --git a/proto.h b/proto.h
new file mode 100644
index 0000000..047d834
--- /dev/null
+++ b/proto.h
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2023 Mikhail Burakov. This file is part of receiver.
+ *
+ * receiver is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * receiver is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with receiver. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#ifndef RECEIVER_PROTO_H_
+#define RECEIVER_PROTO_H_
+
+#include <assert.h>
+#include <stdint.h>
+
+#define PROTO_TYPE_MISC 0
+#define PROTO_TYPE_VIDEO 1
+#define PROTO_TYPE_AUDIO 2
+
+#define PROTO_FLAG_KEYFRAME 1
+
+struct Proto {
+ uint32_t size;
+ uint8_t type;
+ uint8_t flags;
+ uint16_t latency;
+ uint8_t data[];
+};
+
+static_assert(sizeof(struct Proto) == 8 * sizeof(uint8_t),
+ "Suspicious proto struct size");
+
+#endif // RECEIVER_PROTO_H_