Skip to content

Commit

Permalink
add base for async writing
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Jul 27, 2024
1 parent 26ee1f3 commit 4346c5c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 41 deletions.
55 changes: 45 additions & 10 deletions src/event.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,45 @@ void lx_make_nonblocking(int fd) {
}
}

void lx_add_event(lx_event_t *event, int fd) {
void lx_modify_event(lx_event_t *event, int fd, uint32_t operation, uint32_t flags) {
struct epoll_event ev;
ev.events = EPOLLIN;
event->flags = flags;

ev.events = event->flags;
ev.data.ptr = event;
if (epoll_ctl(event->ctx->epoll_fd, EPOLL_CTL_ADD, fd, &ev) == -1) {
perror("lx_add_event");
exit(EXIT_FAILURE);
if (epoll_ctl(event->ctx->epoll_fd, operation, fd, &ev) == -1) {
perror("lx_modify_event");
}
}

void lx_add_event(lx_event_t *event, int fd) {
event->flags |= EPOLLIN;
lx_modify_event(event, fd, EPOLL_CTL_ADD, event->flags);
}

void lx_set_read_event(lx_event_t *event, int fd) {
event->flags |= EPOLLIN;
lx_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void lx_set_write_event(lx_event_t *event, int fd) {
event->flags |= EPOLLOUT;
lx_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void lx_stop_reading(lx_event_t *event, int fd) {
event->flags &= ~EPOLLIN;
lx_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void lx_stop_writing(lx_event_t *event, int fd) {
event->flags &= ~EPOLLOUT;
lx_modify_event(event, fd, EPOLL_CTL_MOD, event->flags);
}

void lx_remove_event(lx_event_t *event, int fd) {
if (epoll_ctl(event->ctx->epoll_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
perror("lx_remove_event");
exit(EXIT_FAILURE);
}
}

Expand All @@ -54,15 +79,25 @@ void lx_run(lx_io_t *ctx) {
while(true) {
int events_count = epoll_wait(ctx->epoll_fd, events, MAX_EVENTS, epoll_timeout);
if (events_count == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
perror("epoll_wait");
exit(EXIT_FAILURE);
}

ctx->now = lx_now();

for (int i = 0; i < events_count; i++) {
lx_event_t *event = events[i].data.ptr;
event->handler(event);
lx_event_t *event = events[i].data.ptr;
if (events[i].events & EPOLLIN) {
if (event->read) {
event->read(event);
}
}

if (events[i].events & EPOLLOUT) {
if (event->write) {
event->write(event);
}
}
}

epoll_timeout = lx_timers_run(ctx);
Expand Down
12 changes: 9 additions & 3 deletions src/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <fcntl.h>
#include <sys/epoll.h>
#include <stdint.h>
#include <stdio.h>

#include "heap.h"

Expand All @@ -16,14 +17,19 @@ typedef struct lx_io {

typedef struct lx_event {
lx_io_t *ctx;
void (*handler)(struct lx_event *);
void *data;
int flags;
void (*read)(struct lx_event*);
void (*write)(struct lx_event*);
} lx_event_t;

/* initializes the event loop */
lx_io_t lx_init();
/* adds event to the epoll */
void lx_add_event(lx_event_t*, int);
void lx_add_event(lx_event_t *, int);
void lx_set_read_event(lx_event_t*, int);
void lx_set_write_event(lx_event_t*, int);
void lx_stop_reading(lx_event_t*, int);
void lx_stop_writing(lx_event_t*, int);
/* removes event from the epoll and destroy it */
void lx_remove_event(lx_event_t*, int);
void lx_make_nonblocking(int fd);
Expand Down
5 changes: 3 additions & 2 deletions src/heap.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include "stdbool.h"
#include "common.h"

struct heap_node {
Expand Down Expand Up @@ -216,18 +214,21 @@ static void heap_swap(struct heap *h, struct heap_node *child, struct heap_node

parent->parent = child;

// prevent self-reference:
if (tmp.left == child)
child->left = parent;
else
child->left = tmp.left;

// prevent self-reference:
if (tmp.right == child)
child->right = parent;
else
child->right = tmp.right;

child->parent = tmp.parent;

// don't forget to set new root..
if (!child->parent)
h->root = child;
}
19 changes: 11 additions & 8 deletions src/http/http.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,25 @@ void print_raw_headers(http_request_t *req) {
printf("}\n");
}

void print_raw_body(http_request_t *req) {
for (size_t i = 0; i < req->received; ++i) {
printf("%c", req->body[i]);
}
printf("\n");
}

// handle request with parsed headers
void lx_http_on_request(http_request_t *req) {
log_info("%s %s", http_map_method(req->method), req->path);
print_raw_headers(req);

if (req->body) {
log_info("Request with body (%ld):", req->received);
//fwrite(req->body, 1, req->received + 1, stdout);
for (size_t i = 0; i < req->received; ++i) {
printf("%c", req->body[i]);
}
printf("\n");
print_raw_body(req);
}

const char *response = "HTTP/1.1 200 OK\r\nContent-Length: 22\r\nContent-Type: text/html\r\n\r\n<h1>Hello world!</h1>\n";
send(req->connection->fd, response, strlen(response), 0);
lx_write(req->connection, response, strlen(response));
}

#define MIN(a, b) ((a) < (b) ? (a) : (b))
Expand Down Expand Up @@ -109,7 +112,7 @@ void lx_http_read_headers(lx_connection_t *conn) {
size_t nread = req->parser.nread;
if (req->content_length == 0) {
lx_http_on_request(req);
lx_close(conn);
//lx_close(conn);
return;
}

Expand All @@ -127,7 +130,7 @@ void lx_http_read_headers(lx_connection_t *conn) {

if (req->received == req->content_length) {
lx_http_on_request(req);
lx_close(conn);
//lx_close(conn);
return;
}

Expand Down
64 changes: 52 additions & 12 deletions src/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ lx_connection_t *lx_connection_init(lx_io_t *ctx, socket_t fd) {
conn->data = NULL;
conn->size = 0;

conn->output = NULL;
conn->written = 0;
conn->output_size = 0;

conn->event.ctx = ctx;
conn->event.data = NULL;
conn->event.handler = lx_connection_handler;
conn->event.flags = 0;
conn->event.read = lx_connection_read;
conn->event.write = lx_connection_write;

return conn;
}
Expand All @@ -23,8 +28,9 @@ lx_listener_t *lx_listener_init(lx_io_t *ctx, socket_t lfd) {
listener->onaccept = NULL;

listener->event.ctx = ctx;
listener->event.handler = lx_listener_handler;
listener->event.data = NULL;
listener->event.flags = 0;
listener->event.read = lx_listener_handler;
listener->event.write = NULL;

return listener;
}
Expand Down Expand Up @@ -55,12 +61,38 @@ void lx_listener_handler(lx_event_t *event) {
}

/* epoll event handler for connection (EPOLLIN) */
void lx_connection_handler(lx_event_t *event) {
void lx_connection_read(lx_event_t *event) {
lx_connection_t *conn = container_of(event, lx_connection_t, event);
if (conn->ondata != NULL)
conn->ondata(conn);
}

/* epoll event handler for connection (EPOLLOUT) */
void lx_connection_write(lx_event_t *event) {
lx_connection_t *conn = container_of(event, lx_connection_t, event);

// idk what to do in this scenario
if (conn->output == NULL)
return;

size_t to_write = conn->output_size - conn->written;
char *buf = conn->output + conn->written;

ssize_t written = write(conn->fd, buf, to_write);
if (written == -1)
perror("socket::write");

conn->written += written;

if (conn->written == conn->output_size) {
printf("Everything is written\n");
conn->written = 0;
conn->output_size = 0;
conn->output = NULL;
lx_stop_writing(event, conn->fd);
}
}

lx_listener_t *lx_listen(lx_io_t *ctx, int port, void (*onaccept)(struct lx_connection *)) {
socket_t lfd = socket(AF_INET, SOCK_STREAM, 0);
if (lfd <= 0) {
Expand All @@ -75,7 +107,7 @@ lx_listener_t *lx_listen(lx_io_t *ctx, int port, void (*onaccept)(struct lx_conn

int opt = 1;

if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
if (setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt))) {
perror("setsockopt");
exit(EXIT_FAILURE);
}
Expand Down Expand Up @@ -109,11 +141,19 @@ void lx_close(lx_connection_t *conn) {
free(conn);
}

int lx_recv(lx_connection_t *conn) {
int bytes = recv(conn->fd, conn->buf + conn->size, LX_NET_BUFFER_SIZE - conn->size, 0);
if (bytes <= 0)
return bytes;
// TODO: add callback: void (*onwrite)(char *buf, size_t size)
int lx_write(lx_connection_t *conn, char *buf, size_t size) {
/*
int written = write(conn->fd, buf, size);
if (written < 0) {
if (written != EWOULDBLOCK || written != EAGAIN)
return -1;
}
*/
conn->output = buf;
conn->output_size = size;
//conn->written += written;

conn->size += bytes;
return bytes;
lx_set_write_event(&conn->event, conn->fd);
}
13 changes: 7 additions & 6 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ typedef struct lx_connection {
void *data; // pointer to the higher level protocol object
lx_listener_t *listener;
lx_event_t event;
/* buffer is stored in TCP connection object,
* because I want to support multiple protocols (http and ws), each with own struct
* so I can reuse same buffer to save memory usage
*/
size_t size;
char buf[LX_NET_BUFFER_SIZE]; // TODO: define it as flexible array member?
// output buffers will be replaced with chain of buffers (queue of buffers)
char *output;
size_t output_size;
size_t written;
} lx_connection_t;

#define lx_conn_ctx(conn_ptr) conn_ptr->event.ctx
Expand All @@ -38,6 +38,7 @@ lx_listener_t *lx_listen(lx_io_t *ctx, int port, void (*)(lx_connection_t*));
lx_connection_t *lx_connection_init(lx_io_t *ctx, socket_t fd);
lx_listener_t *lx_listener_init(lx_io_t *ctx, socket_t lfd);
void lx_listener_handler(lx_event_t *event);
void lx_connection_handler(lx_event_t *event);
void lx_connection_read(lx_event_t *event);
void lx_connection_write(lx_event_t *event);
void lx_close(lx_connection_t *conn);
int lx_recv(lx_connection_t *conn);
int lx_write(lx_connection_t *conn, char *buf, size_t size);

0 comments on commit 4346c5c

Please sign in to comment.