From ed65c3d95ff259025c1a1b79bea5276ca4ee6d7e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Dziuba?= Date: Sun, 28 Jul 2024 03:18:39 +0200 Subject: [PATCH] net: add write queues without backpressure --- src/http/http.c | 5 ++- src/main.c | 4 +- src/net.c | 97 +++++++++++++++++++++++++++++++++++-------------- src/net.h | 18 ++++++--- src/queue.h | 10 ++++- 5 files changed, 96 insertions(+), 38 deletions(-) diff --git a/src/http/http.c b/src/http/http.c index 5205066..439c3bc 100644 --- a/src/http/http.c +++ b/src/http/http.c @@ -37,7 +37,10 @@ void lx_http_on_request(http_request_t *req) { } const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 22\r\nContent-Type: text/html\r\n\r\n

Hello world!

\n"; - lx_write(req->connection, response, strlen(response)); + const char *body = "

Hello world!

\n"; + + lx_write(req->connection, response, strlen(response), NULL); + lx_write(req->connection, body, strlen(body), NULL); } #define MIN(a, b) ((a) < (b) ? (a) : (b)) diff --git a/src/main.c b/src/main.c index b86645a..7d69bee 100644 --- a/src/main.c +++ b/src/main.c @@ -42,8 +42,8 @@ void timer_test() { } int main() { - timer_test(); - //worker(1, 8000); + //timer_test(); + worker(1, 8000); return 0; } /* diff --git a/src/net.c b/src/net.c index 4d557db..d660c99 100644 --- a/src/net.c +++ b/src/net.c @@ -9,9 +9,7 @@ lx_connection_t *lx_connection_init(lx_io_t *ctx, socket_t fd) { conn->data = NULL; conn->size = 0; - conn->output = NULL; - conn->written = 0; - conn->output_size = 0; + queue_init(&conn->output); conn->event.ctx = ctx; conn->event.flags = 0; @@ -69,26 +67,37 @@ void lx_connection_read(lx_event_t *event) { /* epoll event handler for connection (EPOLLOUT) */ void lx_connection_write(lx_event_t *event) { + printf("Handle EPOLLOUT..."); lx_connection_t *conn = container_of(event, lx_connection_t, event); - // idk what to do in this scenario - if (conn->output == NULL) - return; + while (!queue_empty(&conn->output)) { + struct queue_node *next = conn->output.head; + lx_write_t *write_op = container_of(next, lx_write_t, qnode); - size_t to_write = conn->output_size - conn->written; - char *buf = conn->output + conn->written; + size_t to_write = write_op->size - write_op->written; + const char *buf = write_op->buf + write_op->written; + ssize_t written = write(conn->fd, buf, to_write); - ssize_t written = write(conn->fd, buf, to_write); - if (written == -1) - perror("socket::write"); - - conn->written += written; + if (written < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + // idk what to do yet in this scenario + perror("socket->write"); + return; + } + + write_op->written += written; + + if (write_op->written == write_op->size) { + queue_pop(&conn->output); + if (write_op->cb) + write_op->cb(write_op->buf, write_op->size); + free(write_op); + } + } - if (conn->written == conn->output_size) { - printf("Everything is written\n"); - conn->written = 0; - conn->output_size = 0; - conn->output = NULL; + if (queue_empty(&conn->output)) { lx_stop_writing(event, conn->fd); } } @@ -141,19 +150,51 @@ void lx_close(lx_connection_t *conn) { free(conn); } +lx_write_t *enqueue_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) { + printf("Enqueuing write...\n"); + lx_write_t *write_op = malloc(sizeof(lx_write_t)); + write_op->buf = buf; + write_op->size = size; + write_op->written = 0; + write_op->cb = cb; + queue_init_node(&write_op->qnode); + queue_push(&conn->output, &write_op->qnode); + + return write_op; +} + // TODO: add callback: void (*onwrite)(char *buf, size_t size) -int lx_write(lx_connection_t *conn, char *buf, size_t size) { - /* - int written = write(conn->fd, buf, size); +int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) { + if (queue_empty(&conn->output)) { + // try write + ssize_t written = write(conn->fd, buf, size); + if (written < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + enqueue_write(conn, buf, size, cb); + lx_set_write_event(&conn->event, conn->fd); + return 0; + } else { + return -1; + } + } - if (written < 0) { - if (written != EWOULDBLOCK || written != EAGAIN) - return -1; + // fully written + if (written == size) { + if (cb != NULL) + cb(buf, size); + return 1; + } + + // partial write: + lx_write_t *write_op = enqueue_write(conn, buf, size, cb); + write_op->written = written; + lx_set_write_event(&conn->event, conn->fd); + + return 0; } - */ - conn->output = buf; - conn->output_size = size; - //conn->written += written; + enqueue_write(conn, buf, size, cb); lx_set_write_event(&conn->event, conn->fd); + + return 0; } diff --git a/src/net.h b/src/net.h index f8aac62..48ee01f 100644 --- a/src/net.h +++ b/src/net.h @@ -4,6 +4,7 @@ #include #include #include +#include "queue.h" #define LX_NET_BUFFER_SIZE 8192 @@ -17,6 +18,16 @@ typedef struct lx_listener { void (*onaccept)(struct lx_connection *); } lx_listener_t; +typedef void (*write_cb_t)(const char*, size_t); + +typedef struct lx_write { + const char *buf; + size_t size; + size_t written; + write_cb_t cb; + struct queue_node qnode; +} lx_write_t; + typedef struct lx_connection { socket_t fd; void (*ondata)(struct lx_connection *); @@ -26,10 +37,7 @@ typedef struct lx_connection { lx_event_t event; size_t size; char buf[LX_NET_BUFFER_SIZE]; // TODO: define it as flexible array member? - // output buffers will be replaced with chain of buffers (queue of buffers) - char *output; - size_t output_size; - size_t written; + struct queue output; } lx_connection_t; #define lx_conn_ctx(conn_ptr) conn_ptr->event.ctx @@ -41,4 +49,4 @@ void lx_listener_handler(lx_event_t *event); void lx_connection_read(lx_event_t *event); void lx_connection_write(lx_event_t *event); void lx_close(lx_connection_t *conn); -int lx_write(lx_connection_t *conn, char *buf, size_t size); +int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb); diff --git a/src/queue.h b/src/queue.h index 56bbfef..4ff1f3e 100644 --- a/src/queue.h +++ b/src/queue.h @@ -1,6 +1,8 @@ #pragma once #include "common.h" +#define queue_empty(qptr) (((qptr)->head == NULL) && ((qptr)->tail == NULL)) + struct queue_node { struct queue_node *next; struct queue_node *prev; @@ -11,10 +13,14 @@ struct queue { struct queue_node *tail; }; -static struct queue *queue_alloc() { - struct queue *q = malloc(sizeof(struct queue)); +static void queue_init(struct queue *q) { q->head = NULL; q->tail = NULL; +} + +static struct queue *queue_alloc() { + struct queue *q = malloc(sizeof(struct queue)); + queue_init(q); return q; }