Skip to content

Commit

Permalink
split buffered and unbuffered channels
Browse files Browse the repository at this point in the history
  • Loading branch information
Snikimonkd committed Jan 19, 2025
1 parent 58af665 commit a624054
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 122 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,47 @@
module std::thread::channel(<Type, SIZE>);
module std::thread::channel(<Type>);

struct BufferedChannel @private
distinct BufferedChannel = void*;

fn void! BufferedChannel.new_init(&self, usz size, Allocator allocator = allocator::heap())
{
BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!;
channel.init(size, allocator)!;
*self = (BufferedChannel)channel;
return;
}

fn void! BufferedChannel.destroy(&self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
channel.destroy()!;
return;
}


fn void! BufferedChannel.push(&self, Type val)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
return channel.push(val);
}

fn Type! BufferedChannel.pop(&self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
return channel.pop();
}

fn void! BufferedChannel.close(&self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
return channel.close();
}

struct BufferedChannelImpl @private
{
Allocator allocator;
Mutex mu;
Type[] buf;
bool closed;
usz size;
usz elems;

usz sendx;
Expand All @@ -15,12 +51,14 @@ struct BufferedChannel @private
usz readx;
usz read_waiting;
ConditionVariable read_cond;

Type[*] buf;
}

fn void! BufferedChannel.init(&self, Allocator allocator) @private
fn void! BufferedChannelImpl.init(&self, usz size, Allocator allocator) @private
{
self.allocator = allocator;
self.buf = allocator::new_array(allocator, Type, SIZE);
self.size = size;
self.elems = 0;
self.sendx = 0;
self.send_waiting = 0;
Expand All @@ -35,24 +73,23 @@ fn void! BufferedChannel.init(&self, Allocator allocator) @private
defer catch (void)self.read_cond.destroy();
}

fn void! BufferedChannel.destroy(&self) @private
fn void! BufferedChannelImpl.destroy(&self) @private
{
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;
allocator::free(self.allocator, self.buf);
allocator::free(self.allocator, self);

if (err) return err?;
}

fn void! BufferedChannel.push(&self, Type val) @private
fn void! BufferedChannelImpl.push(&self, Type val) @private
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();

// if chan is full -> wait
while (self.elems == SIZE && !self.closed)
while (self.elems == self.size && !self.closed)
{
self.send_waiting++;
self.send_cond.wait(&self.mu)!;
Expand All @@ -70,7 +107,7 @@ fn void! BufferedChannel.push(&self, Type val) @private

// move pointer
self.sendx++;
if (self.sendx == SIZE)
if (self.sendx == self.size)
{
self.sendx = 0;
}
Expand All @@ -89,7 +126,7 @@ fn void! BufferedChannel.push(&self, Type val) @private
return;
}

fn Type! BufferedChannel.pop(&self) @private
fn Type! BufferedChannelImpl.pop(&self) @private
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Expand All @@ -113,7 +150,7 @@ fn Type! BufferedChannel.pop(&self) @private

// move pointer
self.readx++;
if (self.readx == SIZE)
if (self.readx == self.size)
{
self.readx = 0;
}
Expand All @@ -132,7 +169,7 @@ fn Type! BufferedChannel.pop(&self) @private
return ret;
}

fn void! BufferedChannel.close(&self) @private
fn void! BufferedChannelImpl.close(&self) @private
{
anyfault err = @catch(self.mu.lock());

Expand Down
70 changes: 0 additions & 70 deletions lib/std/threads/channel.c3

This file was deleted.

49 changes: 42 additions & 7 deletions lib/std/threads/unbuffered_channel.c3
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
module std::thread::channel(<Type, SIZE>);
module std::thread::channel(<Type>);

struct UnbufferedChannel @private
distinct UnbufferedChannel = void*;

fn void! UnbufferedChannel.new_init(&self, Allocator allocator = allocator::heap())
{
UnbufferedChannelImpl* channel = allocator::alloc(allocator, UnbufferedChannelImpl);
channel.init(allocator)!;
*self = (UnbufferedChannel)channel;
return;
}

fn void! UnbufferedChannel.destroy(&self)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
channel.destroy()!;
return;
}

fn void! UnbufferedChannel.push(&self, Type val)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
return channel.push(val);
}

fn Type! UnbufferedChannel.pop(&self)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
return channel.pop();
}

fn void! UnbufferedChannel.close(&self)
{
UnbufferedChannelImpl* channel = (UnbufferedChannelImpl*)(*self);
return channel.close();
}

struct UnbufferedChannelImpl @private
{
Allocator allocator;
Mutex mu;
Expand All @@ -16,7 +51,7 @@ struct UnbufferedChannel @private
ConditionVariable read_cond;
}

fn void! UnbufferedChannel.init(&self, Allocator allocator) @private
fn void! UnbufferedChannelImpl.init(&self, Allocator allocator) @private
{
self.allocator = allocator;
self.send_waiting = 0;
Expand All @@ -34,7 +69,7 @@ fn void! UnbufferedChannel.init(&self, Allocator allocator) @private
defer catch (void)self.read_cond.destroy();
}

fn void! UnbufferedChannel.destroy(&self) @private
fn void! UnbufferedChannelImpl.destroy(&self) @private
{
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_mu.destroy()) ?: err;
Expand All @@ -46,7 +81,7 @@ fn void! UnbufferedChannel.destroy(&self) @private
if (err) return err?;
}

fn void! UnbufferedChannel.push(&self, Type val) @private
fn void! UnbufferedChannelImpl.push(&self, Type val) @private
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Expand Down Expand Up @@ -80,7 +115,7 @@ fn void! UnbufferedChannel.push(&self, Type val) @private
return;
}

fn Type! UnbufferedChannel.pop(&self) @private
fn Type! UnbufferedChannelImpl.pop(&self) @private
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();
Expand Down Expand Up @@ -111,7 +146,7 @@ fn Type! UnbufferedChannel.pop(&self) @private
return ret;
}

fn void! UnbufferedChannel.close(&self) @private
fn void! UnbufferedChannelImpl.close(&self) @private
{
anyfault err = @catch(self.mu.lock());

Expand Down
Loading

0 comments on commit a624054

Please sign in to comment.