Skip to content

Commit

Permalink
net: add write queues without backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Jul 28, 2024
1 parent 29c9a12 commit ed65c3d
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 38 deletions.
5 changes: 4 additions & 1 deletion src/http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ void lx_http_on_request(http_request_t *req) {
}

const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 22\r\nContent-Type: text/html\r\n\r\n<h1>Hello world!</h1>\n";
lx_write(req->connection, response, strlen(response));
const char *body = "<h1>Hello world!</h1>\n";

lx_write(req->connection, response, strlen(response), NULL);
lx_write(req->connection, body, strlen(body), NULL);
}

#define MIN(a, b) ((a) < (b) ? (a) : (b))
Expand Down
4 changes: 2 additions & 2 deletions src/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ void timer_test() {
}

int main() {
timer_test();
//worker(1, 8000);
//timer_test();
worker(1, 8000);
return 0;
}
/*
Expand Down
97 changes: 69 additions & 28 deletions src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ lx_connection_t *lx_connection_init(lx_io_t *ctx, socket_t fd) {
conn->data = NULL;
conn->size = 0;

conn->output = NULL;
conn->written = 0;
conn->output_size = 0;
queue_init(&conn->output);

conn->event.ctx = ctx;
conn->event.flags = 0;
Expand Down Expand Up @@ -69,26 +67,37 @@ void lx_connection_read(lx_event_t *event) {

/* epoll event handler for connection (EPOLLOUT) */
void lx_connection_write(lx_event_t *event) {
printf("Handle EPOLLOUT...");
lx_connection_t *conn = container_of(event, lx_connection_t, event);

// idk what to do in this scenario
if (conn->output == NULL)
return;
while (!queue_empty(&conn->output)) {
struct queue_node *next = conn->output.head;
lx_write_t *write_op = container_of(next, lx_write_t, qnode);

size_t to_write = conn->output_size - conn->written;
char *buf = conn->output + conn->written;
size_t to_write = write_op->size - write_op->written;
const char *buf = write_op->buf + write_op->written;
ssize_t written = write(conn->fd, buf, to_write);

ssize_t written = write(conn->fd, buf, to_write);
if (written == -1)
perror("socket::write");

conn->written += written;
if (written < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;

// idk what to do yet in this scenario
perror("socket->write");
return;
}

write_op->written += written;

if (write_op->written == write_op->size) {
queue_pop(&conn->output);
if (write_op->cb)
write_op->cb(write_op->buf, write_op->size);
free(write_op);
}
}

if (conn->written == conn->output_size) {
printf("Everything is written\n");
conn->written = 0;
conn->output_size = 0;
conn->output = NULL;
if (queue_empty(&conn->output)) {
lx_stop_writing(event, conn->fd);
}
}
Expand Down Expand Up @@ -141,19 +150,51 @@ void lx_close(lx_connection_t *conn) {
free(conn);
}

lx_write_t *enqueue_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) {
printf("Enqueuing write...\n");
lx_write_t *write_op = malloc(sizeof(lx_write_t));
write_op->buf = buf;
write_op->size = size;
write_op->written = 0;
write_op->cb = cb;
queue_init_node(&write_op->qnode);
queue_push(&conn->output, &write_op->qnode);

return write_op;
}

// TODO: add callback: void (*onwrite)(char *buf, size_t size)
int lx_write(lx_connection_t *conn, char *buf, size_t size) {
/*
int written = write(conn->fd, buf, size);
int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) {
if (queue_empty(&conn->output)) {
// try write
ssize_t written = write(conn->fd, buf, size);
if (written < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
enqueue_write(conn, buf, size, cb);
lx_set_write_event(&conn->event, conn->fd);
return 0;
} else {
return -1;
}
}

if (written < 0) {
if (written != EWOULDBLOCK || written != EAGAIN)
return -1;
// fully written
if (written == size) {
if (cb != NULL)
cb(buf, size);
return 1;
}

// partial write:
lx_write_t *write_op = enqueue_write(conn, buf, size, cb);
write_op->written = written;
lx_set_write_event(&conn->event, conn->fd);

return 0;
}
*/
conn->output = buf;
conn->output_size = size;
//conn->written += written;

enqueue_write(conn, buf, size, cb);
lx_set_write_event(&conn->event, conn->fd);

return 0;
}
18 changes: 13 additions & 5 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <sys/types.h>
#include <netinet/in.h>
#include <unistd.h>
#include "queue.h"

#define LX_NET_BUFFER_SIZE 8192

Expand All @@ -17,6 +18,16 @@ typedef struct lx_listener {
void (*onaccept)(struct lx_connection *);
} lx_listener_t;

typedef void (*write_cb_t)(const char*, size_t);

typedef struct lx_write {
const char *buf;
size_t size;
size_t written;
write_cb_t cb;
struct queue_node qnode;
} lx_write_t;

typedef struct lx_connection {
socket_t fd;
void (*ondata)(struct lx_connection *);
Expand All @@ -26,10 +37,7 @@ typedef struct lx_connection {
lx_event_t event;
size_t size;
char buf[LX_NET_BUFFER_SIZE]; // TODO: define it as flexible array member?
// output buffers will be replaced with chain of buffers (queue of buffers)
char *output;
size_t output_size;
size_t written;
struct queue output;
} lx_connection_t;

#define lx_conn_ctx(conn_ptr) conn_ptr->event.ctx
Expand All @@ -41,4 +49,4 @@ void lx_listener_handler(lx_event_t *event);
void lx_connection_read(lx_event_t *event);
void lx_connection_write(lx_event_t *event);
void lx_close(lx_connection_t *conn);
int lx_write(lx_connection_t *conn, char *buf, size_t size);
int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb);
10 changes: 8 additions & 2 deletions src/queue.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once
#include "common.h"

#define queue_empty(qptr) (((qptr)->head == NULL) && ((qptr)->tail == NULL))

struct queue_node {
struct queue_node *next;
struct queue_node *prev;
Expand All @@ -11,10 +13,14 @@ struct queue {
struct queue_node *tail;
};

static struct queue *queue_alloc() {
struct queue *q = malloc(sizeof(struct queue));
static void queue_init(struct queue *q) {
q->head = NULL;
q->tail = NULL;
}

static struct queue *queue_alloc() {
struct queue *q = malloc(sizeof(struct queue));
queue_init(q);

return q;
}
Expand Down

0 comments on commit ed65c3d

Please sign in to comment.