Skip to content

Commit

Permalink
feat: add basic threadpool
Browse files Browse the repository at this point in the history
  • Loading branch information
michaldziuba03 committed Aug 19, 2024
1 parent 5113984 commit 273b7c0
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 58 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ $(TEST_BUILD_DIR)/%.o: $(TEST_DIR)/%.c
@mkdir -p $(dir $@)
$(CC) $(CFLAGS) -c $< -o $@

samples: lib $(SAMPLES_OBJECTS)
samples: $(SAMPLES_OBJECTS)
@mkdir -p $(BUILD_DIR)/samples
$(CC) $(CFLAGS) $(SAMPLES_OBJECTS) -L$(BUILD_DIR) -lpandio -lrt -o $(BUILD_DIR)/samples/tcp_echo

Expand Down
2 changes: 2 additions & 0 deletions docs/tcp.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Pandio

Docs not ready yet

## TCP streams

Creating TCP server
Expand Down
27 changes: 23 additions & 4 deletions samples/tcp_echo.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,17 +98,36 @@ void handle_connect(pnd_tcp_t *client, pnd_fd_t fd) {
pnd_tcp_write(client, write_op);
}

void long_task() {
printf("Long task started...\n");
sleep(3);
}

void after_task() {
printf("Long task done, but from main thread...\n");
}

int main() {
printf("Starting client, pid is: %d\n", getpid());
pnd_io_t ctx;
pnd_io_init(&ctx);
pnd_io_t *ctx = malloc(sizeof(pnd_io_t));
pnd_io_init(ctx);

pnd_task_t *task = malloc(sizeof(pnd_task_t));
task->work = long_task;
task->done = after_task;
pnd_work_submit(ctx, task);

pnd_task_t *task2 = malloc(sizeof(pnd_task_t));
task2->work = long_task;
task2->done = after_task;
pnd_work_submit(ctx, task2);

pnd_tcp_t *client = malloc(sizeof(pnd_tcp_t));
pnd_tcp_init(&ctx, client);
pnd_tcp_init(ctx, client);
int status = pnd_tcp_connect(client, "127.0.0.1", 3000, handle_connect);
printf("Connect status: %d\n", status);

pnd_io_run(&ctx);
pnd_io_run(ctx);

return 0;
}
15 changes: 15 additions & 0 deletions src/pandio.c
Original file line number Diff line number Diff line change
@@ -1,18 +1,33 @@
#include "pandio.h"
#include "timers.h"
#include "unix/poll.h"
#include <sys/eventfd.h>
#include "unix/threadpool.h"

void pnd_io_init(pnd_io_t *ctx)
{
ctx->poll_handle = 0;
ctx->handles = 0;
ctx->now = 0;
ctx->task_signal = eventfd(0, 0);

pnd_timers_heap_init(ctx);
queue_init(&ctx->pending_closes);
queue_init(&ctx->tasks_done);
pnd_poll_init(ctx);

pnd_set_nonblocking(ctx->task_signal);

pnd_event_t *task_event = malloc(sizeof(pnd_event_t));
pnd_init_event(task_event);
task_event->ctx = ctx;
task_event->callback = pnd_work_done_io;
pnd_add_event_readable(task_event, ctx->task_signal);

pnd_workers_init();
}


void pnd_tcp_pending_close(pnd_io_t *ctx)
{
while (!queue_empty(&ctx->pending_closes)) {
Expand Down
3 changes: 3 additions & 0 deletions src/pandio.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@

struct pnd_io {
pnd_fd_t poll_handle;
pnd_fd_t task_signal;
size_t handles;
uint64_t now;
struct heap timers;
struct queue pending_closes;
struct queue tasks_done;
};

typedef struct pnd_io pnd_io_t;
Expand All @@ -52,6 +54,7 @@ typedef struct pnd_event pnd_event_t;
// exports
#include "tcp_stream.h"
#include "timers.h"
#include "unix/threadpool.h"

void pnd_io_init(pnd_io_t *ctx);

Expand Down
116 changes: 63 additions & 53 deletions src/unix/threadpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,13 @@
* SOFTWARE.
*/

#include <pthread.h>
#include "threadpool.h"
#include <stdlib.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdio.h>

#include "../queue.h"

#define THREAD_POOL_SIZE 4

struct pnd_task {
void (*func)(void*);
void *args;
struct queue_node qnode;
};

typedef struct pnd_task pnd_task_t;
#include "poll.h"
#include <sys/eventfd.h>

static size_t nthreads = 0;
static pthread_t *threads = NULL;
Expand All @@ -45,6 +35,13 @@ static struct queue tasks = { .head = NULL, .tail = NULL };
static size_t ntasks = 0;


void pnd_work_done_signal(pnd_io_t *ctx)
{
int64_t u = 1;
write(ctx->task_signal, &u, sizeof(int64_t));
}


void *pnd_work_exec(void *arg) {
while (true) {
pthread_mutex_lock(&mut);
Expand All @@ -54,75 +51,88 @@ void *pnd_work_exec(void *arg) {
pthread_cond_wait(&cond, &mut);
}


struct queue_node *node = queue_pop(&tasks);
pnd_task_t *task = container_of(node, pnd_task_t, qnode);
ntasks--;

pthread_mutex_unlock(&mut);

if (task->func)
task->func(task->args);
if (task->work)
task->work(task);

queue_init_node(&task->qnode); // reset qnode and later push to the tasks done queue

free(task);
pthread_mutex_lock(&mut);
queue_push(&task->ctx->tasks_done, &task->qnode);

pnd_work_done_signal(task->ctx);
pthread_mutex_unlock(&mut);
}
}


void pnd_work_submit(void (*func)(void*), void *args) {
pnd_task_t *task = malloc(sizeof(pnd_task_t));
void pnd_work_done(pnd_io_t *ctx)
{
struct queue tasks_done;
queue_init(&tasks_done);

task->func = func;
task->args = args;
pthread_mutex_lock(&mut);

// copy data from synchronized queue to the local queue
while (!queue_empty(&ctx->tasks_done)) {
struct queue_node *node = queue_pop(&ctx->tasks_done);
queue_push(&tasks_done, node);
}

pthread_mutex_unlock(&mut);

// execute done callbacks in the main thread
while (!queue_empty(&tasks_done)) {
struct queue_node *node = queue_pop(&tasks_done);
pnd_task_t *task = container_of(node, pnd_task_t, qnode);

if (task->done)
task->done(task);
}
}


void pnd_work_done_io(pnd_event_t *ev, unsigned events)
{
if (events & PND_READABLE) {
uint64_t u;
if (read(ev->ctx->task_signal, &u, sizeof(uint64_t)) == sizeof(uint64_t))
pnd_work_done(ev->ctx);
}
}


void pnd_work_submit(pnd_io_t *ctx, pnd_task_t *task)
{
task->ctx = ctx;

queue_init_node(&task->qnode);
pthread_mutex_lock(&mut);
queue_push(&tasks, &task->qnode);
ntasks++;

pthread_cond_signal(&cond);
pthread_mutex_unlock(&mut);
}


void pnd_workers_init() {
void pnd_workers_init()
{
// init only once
if (nthreads != 0)
return;

nthreads = THREAD_POOL_SIZE;
threads = malloc(nthreads * sizeof(pthread_t));

printf("Thread pool size: %ld\n", nthreads);

pthread_mutex_init(&mut, NULL);
pthread_cond_init(&cond, NULL);

for (int i = 0; i < nthreads; ++i) {
pthread_create(&threads[i], NULL, pnd_work_exec, NULL);
}
}


void pnd_workers_stop() {
void *ret = NULL;

for (int i = 0; i < nthreads; ++i) {
pthread_join(threads[i], ret);
}
}


// "long taks"
void test_task(void *args) {
sleep(5);
printf("Hello World\n");
}


int main() {
pnd_workers_init();
for (int i = 0; i < 4; ++i) {
pnd_work_submit(test_task, NULL);
}

pnd_workers_stop();

return 0;
}
46 changes: 46 additions & 0 deletions src/unix/threadpool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/* Copyright (c) Michał Dziuba
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/


#pragma once
#include <pthread.h>
#include <stdlib.h>
#include "queue.h"
#include "poll.h"
#include <sys/eventfd.h>

#define THREAD_POOL_SIZE 4

struct pnd_task {
pnd_io_t *ctx;
void (*work)(struct pnd_task*);
void (*done)(struct pnd_task*);
void *data;
struct queue_node qnode;
};

typedef struct pnd_task pnd_task_t;

void pnd_work_done_io(pnd_event_t *ev, unsigned events);

void pnd_work_submit(pnd_io_t *ctx, pnd_task_t *task);

void pnd_workers_init();

0 comments on commit 273b7c0

Please sign in to comment.