/*
* Copyright (C) 2024 Mikhail Burakov. This file is part of streamer.
*
* streamer is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* streamer is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with streamer. If not, see .
*/
#include "io_context.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "proto.h"
#include "queue.h"
#include "util.h"
struct IoContext {
int fd;
atomic_bool running;
struct Queue prio;
struct Queue queue;
mtx_t mutex;
cnd_t cond;
thrd_t thread;
};
struct ProtoImpl {
struct Proto proto;
struct ProtoHeader header;
uint8_t data[];
};
static bool IsPrioProto(const struct Proto* proto) {
return proto->header->type == kProtoTypePing ||
proto->header->type == kProtoTypePong;
}
static void ProtoDestroy(struct Proto* proto) { free(proto); }
static bool ReadAll(int fd, void* buffer, size_t size) {
for (uint8_t* ptr = buffer; size;) {
ssize_t result = read(fd, ptr, size);
if (result <= 0) {
LOG("Failed to read socket (%s)", strerror(errno));
return false;
}
size -= (size_t)result;
ptr += result;
}
return true;
}
static bool WriteAll(int fd, struct iovec* iov, size_t count) {
while (count) {
int max_count = (int)MIN(count, UIO_MAXIOV);
ssize_t result = writev(fd, iov, max_count);
if (result <= 0) {
LOG("Failed to write socket (%s)", strerror(errno));
return false;
}
for (;;) {
if ((size_t)result < iov->iov_len) {
iov->iov_len -= (size_t)result;
iov->iov_base = (uint8_t*)iov->iov_base + result;
break;
}
result -= (ssize_t)iov->iov_len;
count--;
iov++;
}
}
return true;
}
static bool IoContextDequeue(struct IoContext* io_context,
struct Proto** pproto) {
if (mtx_lock(&io_context->mutex) != thrd_success) {
LOG("Failed to lock mutex (%s)", strerror(errno));
return false;
}
void* item = NULL;
while (!QueuePop(&io_context->prio, &item) &&
!QueuePop(&io_context->queue, &item) &&
atomic_load_explicit(&io_context->running, memory_order_relaxed)) {
assert(cnd_wait(&io_context->cond, &io_context->mutex) == thrd_success);
}
assert(mtx_unlock(&io_context->mutex) == thrd_success);
*pproto = item;
return true;
}
static int IoContextThreadProc(void* arg) {
struct IoContext* io_context = arg;
for (;;) {
struct Proto* proto;
if (!IoContextDequeue(io_context, &proto)) {
LOG("Failed to dequeue proto");
goto leave;
}
if (!proto) {
// mburakov: running was set to false externally.
return 0;
}
struct iovec iov[] = {
{.iov_base = (void*)(uintptr_t)(proto->header),
.iov_len = sizeof(struct ProtoHeader)},
{.iov_base = (void*)(uintptr_t)(proto->data),
.iov_len = proto->header->size},
};
bool result = WriteAll(io_context->fd, iov, LENGTH(iov));
proto->Destroy(proto);
if (!result) {
LOG("Failed to write proto");
goto leave;
}
}
leave:
atomic_store_explicit(&io_context->running, false, memory_order_relaxed);
return 0;
}
struct IoContext* IoContextCreate(uint16_t port) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
LOG("Failed to create socket (%s)", strerror(errno));
return NULL;
}
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int))) {
LOG("Failed to reuse socket address (%s)", strerror(errno));
goto rollback_sock;
}
const struct sockaddr_in sa = {
.sin_family = AF_INET,
.sin_port = htons(port),
.sin_addr = INADDR_ANY,
};
if (bind(sock, (const struct sockaddr*)&sa, sizeof(sa))) {
LOG("Failed to bind socket (%s)", strerror(errno));
goto rollback_sock;
}
if (listen(sock, SOMAXCONN)) {
LOG("Failed to listen socket (%s)", strerror(errno));
goto rollback_sock;
}
struct IoContext* io_context = malloc(sizeof(struct IoContext));
if (!io_context) {
LOG("Failed to allocate io context (%s)", strerror(errno));
goto rollback_sock;
}
io_context->fd = accept(sock, NULL, NULL);
if (io_context->fd == -1) {
LOG("Failed to accept socket (%s)", strerror(errno));
goto rollback_io_context;
}
if (setsockopt(io_context->fd, IPPROTO_TCP, TCP_NODELAY, &(int){1},
sizeof(int))) {
LOG("Failed to set TCP_NODELAY (%s)", strerror(errno));
goto rollback_fd;
}
atomic_init(&io_context->running, true);
QueueCreate(&io_context->prio);
QueueCreate(&io_context->queue);
if (mtx_init(&io_context->mutex, mtx_plain) != thrd_success) {
LOG("Failed to init mutex (%s)", strerror(errno));
goto rollback_fd;
}
if (cnd_init(&io_context->cond) != thrd_success) {
LOG("Failed to init condition variable (%s)", strerror(errno));
goto rollback_mutex;
}
if (thrd_create(&io_context->thread, &IoContextThreadProc, io_context) !=
thrd_success) {
LOG("Failed to create thread (%s)", strerror(errno));
goto rollback_cond;
}
assert(!close(sock));
return io_context;
rollback_cond:
cnd_destroy(&io_context->cond);
rollback_mutex:
mtx_destroy(&io_context->mutex);
rollback_fd:
assert(!close(io_context->fd));
rollback_io_context:
free(io_context);
rollback_sock:
assert(!close(sock));
return NULL;
}
struct Proto* IoContextRead(struct IoContext* io_context) {
struct ProtoHeader header;
if (!ReadAll(io_context->fd, &header, sizeof(header))) {
LOG("Failed to read proto header");
return NULL;
}
struct ProtoImpl* proto_impl = malloc(sizeof(struct ProtoImpl) + header.size);
if (!proto_impl) {
LOG("Failed to allocate proto (%s)", strerror(errno));
return NULL;
}
if (!ReadAll(io_context->fd, &proto_impl->data, header.size)) {
LOG("Failed to read proto body");
goto rollback_proto_impl;
}
proto_impl->header = header;
const struct Proto proto = {
.Destroy = ProtoDestroy,
.header = &proto_impl->header,
.data = proto_impl->data,
};
memcpy(proto_impl, &proto, sizeof(proto));
return &proto_impl->proto;
rollback_proto_impl:
free(proto_impl);
return NULL;
}
bool IoContextWrite(struct IoContext* io_context, struct Proto* proto) {
if (!atomic_load_explicit(&io_context->running, memory_order_relaxed)) {
LOG("Io context is not running");
goto rollback_proto;
}
struct Queue* queue =
IsPrioProto(proto) ? &io_context->prio : &io_context->queue;
if (mtx_lock(&io_context->mutex) != thrd_success) {
LOG("Failed to lock mutex (%s)", strerror(errno));
goto rollback_proto;
}
if (!QueuePush(queue, proto)) {
LOG("Failed to queue proto");
goto rollback_lock;
}
assert(cnd_broadcast(&io_context->cond) == thrd_success);
assert(mtx_unlock(&io_context->mutex) == thrd_success);
return true;
rollback_lock:
assert(mtx_unlock(&io_context->mutex) == thrd_success);
rollback_proto:
proto->Destroy(proto);
return false;
}
void IoContextDestroy(struct IoContext* io_context) {
atomic_store_explicit(&io_context->running, false, memory_order_relaxed);
assert(cnd_broadcast(&io_context->cond) == thrd_success);
assert(thrd_join(io_context->thread, NULL) == thrd_success);
cnd_destroy(&io_context->cond);
mtx_destroy(&io_context->mutex);
for (void* item; QueuePop(&io_context->prio, &item); free(item));
QueueDestroy(&io_context->prio);
for (void* item; QueuePop(&io_context->queue, &item); free(item));
QueueDestroy(&io_context->queue);
assert(!close(io_context->fd));
free(io_context);
}