diff --git a/src/http/http.c b/src/http/http.c
index 5205066..439c3bc 100644
--- a/src/http/http.c
+++ b/src/http/http.c
@@ -37,7 +37,10 @@ void lx_http_on_request(http_request_t *req) {
}
const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 22\r\nContent-Type: text/html\r\n\r\n
Hello world!
\n";
- lx_write(req->connection, response, strlen(response));
+ const char *body = "Hello world!
\n";
+
+ lx_write(req->connection, response, strlen(response), NULL);
+ lx_write(req->connection, body, strlen(body), NULL);
}
#define MIN(a, b) ((a) < (b) ? (a) : (b))
diff --git a/src/main.c b/src/main.c
index b86645a..7d69bee 100644
--- a/src/main.c
+++ b/src/main.c
@@ -42,8 +42,8 @@ void timer_test() {
}
int main() {
- timer_test();
- //worker(1, 8000);
+ //timer_test();
+ worker(1, 8000);
return 0;
}
/*
diff --git a/src/net.c b/src/net.c
index 4d557db..d660c99 100644
--- a/src/net.c
+++ b/src/net.c
@@ -9,9 +9,7 @@ lx_connection_t *lx_connection_init(lx_io_t *ctx, socket_t fd) {
conn->data = NULL;
conn->size = 0;
- conn->output = NULL;
- conn->written = 0;
- conn->output_size = 0;
+ queue_init(&conn->output);
conn->event.ctx = ctx;
conn->event.flags = 0;
@@ -69,26 +67,37 @@ void lx_connection_read(lx_event_t *event) {
/* epoll event handler for connection (EPOLLOUT) */
void lx_connection_write(lx_event_t *event) {
+ printf("Handle EPOLLOUT...");
lx_connection_t *conn = container_of(event, lx_connection_t, event);
- // idk what to do in this scenario
- if (conn->output == NULL)
- return;
+ while (!queue_empty(&conn->output)) {
+ struct queue_node *next = conn->output.head;
+ lx_write_t *write_op = container_of(next, lx_write_t, qnode);
- size_t to_write = conn->output_size - conn->written;
- char *buf = conn->output + conn->written;
+ size_t to_write = write_op->size - write_op->written;
+ const char *buf = write_op->buf + write_op->written;
+ ssize_t written = write(conn->fd, buf, to_write);
- ssize_t written = write(conn->fd, buf, to_write);
- if (written == -1)
- perror("socket::write");
-
- conn->written += written;
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK)
+ return;
+
+ // idk what to do yet in this scenario
+ perror("socket->write");
+ return;
+ }
+
+ write_op->written += written;
+
+ if (write_op->written == write_op->size) {
+ queue_pop(&conn->output);
+ if (write_op->cb)
+ write_op->cb(write_op->buf, write_op->size);
+ free(write_op);
+ }
+ }
- if (conn->written == conn->output_size) {
- printf("Everything is written\n");
- conn->written = 0;
- conn->output_size = 0;
- conn->output = NULL;
+ if (queue_empty(&conn->output)) {
lx_stop_writing(event, conn->fd);
}
}
@@ -141,19 +150,51 @@ void lx_close(lx_connection_t *conn) {
free(conn);
}
+lx_write_t *enqueue_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) {
+ printf("Enqueuing write...\n");
+ lx_write_t *write_op = malloc(sizeof(lx_write_t));
+ write_op->buf = buf;
+ write_op->size = size;
+ write_op->written = 0;
+ write_op->cb = cb;
+ queue_init_node(&write_op->qnode);
+ queue_push(&conn->output, &write_op->qnode);
+
+ return write_op;
+}
+
// TODO: add callback: void (*onwrite)(char *buf, size_t size)
-int lx_write(lx_connection_t *conn, char *buf, size_t size) {
- /*
- int written = write(conn->fd, buf, size);
+int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb) {
+ if (queue_empty(&conn->output)) {
+ // try write
+ ssize_t written = write(conn->fd, buf, size);
+ if (written < 0) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ enqueue_write(conn, buf, size, cb);
+ lx_set_write_event(&conn->event, conn->fd);
+ return 0;
+ } else {
+ return -1;
+ }
+ }
- if (written < 0) {
- if (written != EWOULDBLOCK || written != EAGAIN)
- return -1;
+ // fully written
+ if (written == size) {
+ if (cb != NULL)
+ cb(buf, size);
+ return 1;
+ }
+
+ // partial write:
+ lx_write_t *write_op = enqueue_write(conn, buf, size, cb);
+ write_op->written = written;
+ lx_set_write_event(&conn->event, conn->fd);
+
+ return 0;
}
- */
- conn->output = buf;
- conn->output_size = size;
- //conn->written += written;
+ enqueue_write(conn, buf, size, cb);
lx_set_write_event(&conn->event, conn->fd);
+
+ return 0;
}
diff --git a/src/net.h b/src/net.h
index f8aac62..48ee01f 100644
--- a/src/net.h
+++ b/src/net.h
@@ -4,6 +4,7 @@
#include
#include
#include
+#include "queue.h"
#define LX_NET_BUFFER_SIZE 8192
@@ -17,6 +18,16 @@ typedef struct lx_listener {
void (*onaccept)(struct lx_connection *);
} lx_listener_t;
+typedef void (*write_cb_t)(const char*, size_t);
+
+typedef struct lx_write {
+ const char *buf;
+ size_t size;
+ size_t written;
+ write_cb_t cb;
+ struct queue_node qnode;
+} lx_write_t;
+
typedef struct lx_connection {
socket_t fd;
void (*ondata)(struct lx_connection *);
@@ -26,10 +37,7 @@ typedef struct lx_connection {
lx_event_t event;
size_t size;
char buf[LX_NET_BUFFER_SIZE]; // TODO: define it as flexible array member?
- // output buffers will be replaced with chain of buffers (queue of buffers)
- char *output;
- size_t output_size;
- size_t written;
+ struct queue output;
} lx_connection_t;
#define lx_conn_ctx(conn_ptr) conn_ptr->event.ctx
@@ -41,4 +49,4 @@ void lx_listener_handler(lx_event_t *event);
void lx_connection_read(lx_event_t *event);
void lx_connection_write(lx_event_t *event);
void lx_close(lx_connection_t *conn);
-int lx_write(lx_connection_t *conn, char *buf, size_t size);
+int lx_write(lx_connection_t *conn, const char *buf, size_t size, write_cb_t cb);
diff --git a/src/queue.h b/src/queue.h
index 56bbfef..4ff1f3e 100644
--- a/src/queue.h
+++ b/src/queue.h
@@ -1,6 +1,8 @@
#pragma once
#include "common.h"
+#define queue_empty(qptr) (((qptr)->head == NULL) && ((qptr)->tail == NULL))
+
struct queue_node {
struct queue_node *next;
struct queue_node *prev;
@@ -11,10 +13,14 @@ struct queue {
struct queue_node *tail;
};
-static struct queue *queue_alloc() {
- struct queue *q = malloc(sizeof(struct queue));
+static void queue_init(struct queue *q) {
q->head = NULL;
q->tail = NULL;
+}
+
+static struct queue *queue_alloc() {
+ struct queue *q = malloc(sizeof(struct queue));
+ queue_init(q);
return q;
}