summaryrefslogtreecommitdiff
path: root/main.c
diff options
context:
space:
mode:
authorMikhail Burakov <mburakov@mailbox.org>2023-04-09 12:53:27 +0200
committerMikhail Burakov <mburakov@mailbox.org>2023-04-09 12:53:27 +0200
commit3e3ff72e71e7ba1f8c8d37c47a0f1e10db7aa96f (patch)
treee87028eca75709355d2c7d17df0730fc982b3a46 /main.c
parent9c55db703b6505b1c9bd2a731a98116447d48fd3 (diff)
Add input pipeline handling to streamer
Diffstat (limited to 'main.c')
-rw-r--r--main.c101
1 files changed, 81 insertions, 20 deletions
diff --git a/main.c b/main.c
index 6b32209..121085a 100644
--- a/main.c
+++ b/main.c
@@ -30,6 +30,7 @@
#include "colorspace.h"
#include "encode.h"
#include "gpu.h"
+#include "input.h"
#include "toolbox/io_muxer.h"
#include "toolbox/utils.h"
@@ -50,8 +51,10 @@ struct Contexts {
int server_fd;
struct GpuContext* gpu_context;
struct CaptureContext* capture_context;
- struct EncodeContext* encode_context;
+
int client_fd;
+ struct InputHandler* input_handler;
+ struct EncodeContext* encode_context;
};
static int CreateServerSocket(const char* arg) {
@@ -88,6 +91,29 @@ rollback_sock:
return -1;
}
+static void MaybeDropClient(struct Contexts* contexts) {
+ static const struct itimerspec spec = {0};
+ if (contexts->encode_context) {
+ EncodeContextDestroy(contexts->encode_context);
+ contexts->encode_context = NULL;
+ }
+ if (contexts->input_handler) {
+ IoMuxerForget(&contexts->io_muxer,
+ InputHandlerGetEventsFd(contexts->input_handler));
+ InputHandlerDestroy(contexts->input_handler);
+ contexts->input_handler = NULL;
+ }
+ if (contexts->client_fd != -1) {
+ IoMuxerForget(&contexts->io_muxer, contexts->client_fd);
+ close(contexts->client_fd);
+ contexts->client_fd = -1;
+ }
+ if (timerfd_settime(contexts->timer_fd, 0, &spec, NULL)) {
+ LOG("Failed to disarm timer (%s)", strerror(errno));
+ g_signal = SIGABRT;
+ }
+}
+
static void OnTimerExpire(void* user) {
struct Contexts* contexts = user;
if (!IoMuxerOnRead(&contexts->io_muxer, contexts->timer_fd, &OnTimerExpire,
@@ -146,20 +172,43 @@ static void OnTimerExpire(void* user) {
}
return;
-drop_client:;
- static const struct itimerspec spec = {0};
- if (timerfd_settime(contexts->timer_fd, 0, &spec, NULL)) {
- LOG("Failed to disarm timer (%s)", strerror(errno));
- g_signal = SIGABRT;
+drop_client:
+ MaybeDropClient(contexts);
+}
+
+static void OnClientWriting(void* user) {
+ struct Contexts* contexts = user;
+ if (!IoMuxerOnRead(&contexts->io_muxer, contexts->client_fd, &OnClientWriting,
+ user)) {
+ LOG("Failed to reschedule client reading (%s)", strerror(errno));
+ goto drop_client;
}
- if (contexts->encode_context) {
- EncodeContextDestroy(contexts->encode_context);
- contexts->encode_context = NULL;
+ if (!InputHandlerHandle(contexts->input_handler, contexts->client_fd)) {
+ LOG("Failed to handle client input");
+ goto drop_client;
}
- if (contexts->client_fd != -1) {
- close(contexts->client_fd);
- contexts->client_fd = -1;
+ return;
+
+drop_client:
+ MaybeDropClient(contexts);
+}
+
+static void OnInputEvents(void* user) {
+ struct Contexts* contexts = user;
+ if (!IoMuxerOnRead(&contexts->io_muxer,
+ InputHandlerGetEventsFd(contexts->input_handler),
+ &OnInputEvents, user)) {
+ LOG("Failed to reschedule events reading (%s)", strerror(errno));
+ goto drop_client;
}
+ if (!InputHandlerProcessEvents(contexts->input_handler)) {
+ LOG("Failed to process events");
+ goto drop_client;
+ }
+ return;
+
+drop_client:
+ MaybeDropClient(contexts);
}
static void OnClientConnecting(void* user) {
@@ -184,20 +233,34 @@ static void OnClientConnecting(void* user) {
}
contexts->client_fd = client_fd;
+ if (!IoMuxerOnRead(&contexts->io_muxer, contexts->client_fd, &OnClientWriting,
+ user)) {
+ LOG("Failed to schedule client reading (%s)", strerror(errno));
+ goto drop_client;
+ }
+ contexts->input_handler = InputHandlerCreate();
+ if (!contexts->input_handler) {
+ LOG("Failed to create input handler");
+ goto drop_client;
+ }
+ if (!IoMuxerOnRead(&contexts->io_muxer,
+ InputHandlerGetEventsFd(contexts->input_handler),
+ &OnInputEvents, user)) {
+ LOG("Failed to schedule events reading (%s)", strerror(errno));
+ goto drop_client;
+ }
static const struct itimerspec spec = {
.it_interval.tv_nsec = capture_period,
.it_value.tv_nsec = capture_period,
};
if (timerfd_settime(contexts->timer_fd, 0, &spec, NULL)) {
LOG("Failed to arm timer (%s)", strerror(errno));
- goto rollback_client_fd;
+ goto drop_client;
}
return;
-rollback_client_fd:
- close(contexts->client_fd);
- contexts->client_fd = -1;
- return;
+drop_client:
+ MaybeDropClient(contexts);
}
int main(int argc, char* argv[]) {
@@ -255,9 +318,7 @@ int main(int argc, char* argv[]) {
g_signal = SIGABRT;
}
}
-
- if (contexts.encode_context) EncodeContextDestroy(contexts.encode_context);
- if (contexts.client_fd != -1) close(contexts.client_fd);
+ MaybeDropClient(&contexts);
rollback_capture_context:
CaptureContextDestroy(contexts.capture_context);