Skip to content

Commit

Permalink
inline functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Snikimonkd committed Jan 19, 2025
1 parent 3f1df22 commit 1534d9b
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 180 deletions.
168 changes: 76 additions & 92 deletions lib/std/threads/buffered_channel.c3
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,6 @@ module std::thread::channel(<Type>);

distinct BufferedChannel = void*;

fn void! BufferedChannel.new_init(&self, usz size = 1, Allocator allocator = allocator::heap())
{
BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!;
channel.init(size, allocator)!;
*self = (BufferedChannel)channel;
return;
}

fn void! BufferedChannel.destroy(&self)
{
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);
channel.destroy()!;
*self = null;
return;
}

fn void! BufferedChannel.push(self, Type val) @inline
{
return ((BufferedChannelImpl*)self).push(val) @inline;
}

fn Type! BufferedChannel.pop(self) @inline
{
return ((BufferedChannelImpl*)self).pop() @inline;
}

fn void! BufferedChannel.close(self) @inline
{
return ((BufferedChannelImpl*)self).close() @inline;
}

struct BufferedChannelImpl @private
{
Allocator allocator;
Expand All @@ -52,130 +21,145 @@ struct BufferedChannelImpl @private
Type[*] buf;
}

fn void! BufferedChannelImpl.init(&self, usz size, Allocator allocator) @private
fn void! BufferedChannel.new_init(&self, usz size = 1, Allocator allocator = allocator::heap())
{
self.allocator = allocator;
self.size = size;
self.elems = 0;
self.sendx = 0;
self.send_waiting = 0;
self.readx = 0;
self.read_waiting = 0;

self.mu.init()!;
defer catch (void)self.mu.destroy();
self.send_cond.init()!;
defer catch (void)self.send_cond.destroy();
self.read_cond.init()!;
defer catch (void)self.read_cond.destroy();
BufferedChannelImpl* channel = allocator::new_with_padding(allocator, BufferedChannelImpl, Type.sizeof * size)!;

channel.allocator = allocator;
channel.size = size;
channel.elems = 0;
channel.sendx = 0;
channel.send_waiting = 0;
channel.readx = 0;
channel.read_waiting = 0;

channel.mu.init()!;
defer catch (void)channel.mu.destroy();
channel.send_cond.init()!;
defer catch (void)channel.send_cond.destroy();
channel.read_cond.init()!;
defer catch (void)channel.read_cond.destroy();

*self = (BufferedChannel)channel;

return;
}

fn void! BufferedChannelImpl.destroy(&self) @private
fn void! BufferedChannel.destroy(&self)
{
anyfault err = @catch(self.mu.destroy());
err = @catch(self.send_cond.destroy()) ?: err;
err = @catch(self.read_cond.destroy()) ?: err;
allocator::free(self.allocator, self);
BufferedChannelImpl* channel = (BufferedChannelImpl*)(*self);

anyfault err = @catch(channel.mu.destroy());
err = @catch(channel.send_cond.destroy()) ?: err;
err = @catch(channel.read_cond.destroy()) ?: err;
allocator::free(channel.allocator, channel);

*self = null;

if (err) return err?;
}

fn void! BufferedChannelImpl.push(&self, Type val) @private
fn void! BufferedChannel.push(self, Type val)
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;

channel.mu.lock()!;
defer catch (void)channel.mu.unlock();

// if chan is full -> wait
while (self.elems == self.size && !self.closed)
// if channel is full -> wait
while (channel.elems == channel.size && !channel.closed)
{
self.send_waiting++;
self.send_cond.wait(&self.mu)!;
self.send_waiting--;
channel.send_waiting++;
channel.send_cond.wait(&channel.mu)!;
channel.send_waiting--;
}

// check if chan is closed
if (self.closed)
// check if channel is closed
if (channel.closed)
{
return ThreadFault.CHANNEL_CLOSED?;
}

// save value to buf
self.buf[self.sendx] = val;
channel.buf[channel.sendx] = val;

// move pointer
self.sendx++;
if (self.sendx == self.size)
channel.sendx++;
if (channel.sendx == channel.size)
{
self.sendx = 0;
channel.sendx = 0;
}

// change elems counter
self.elems++;
// channelge elems counter
channel.elems++;

// if someone is waiting -> awake him
if (self.read_waiting > 0)
if (channel.read_waiting > 0)
{
self.read_cond.signal()!;
channel.read_cond.signal()!;
}

self.mu.unlock()!;
channel.mu.unlock()!;

return;
}

fn Type! BufferedChannelImpl.pop(&self) @private
fn Type! BufferedChannel.pop(self)
{
self.mu.lock()!;
defer catch (void)self.mu.unlock();
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;

channel.mu.lock()!;
defer catch (void)channel.mu.unlock();

// if chan is empty -> wait for sender
while (self.elems == 0 && !self.closed)
while (channel.elems == 0 && !channel.closed)
{
self.read_waiting++;
self.read_cond.wait(&self.mu)!;
self.read_waiting--;
channel.read_waiting++;
channel.read_cond.wait(&channel.mu)!;
channel.read_waiting--;
}

// check if chan is closed and empty
if (self.closed && self.elems == 0)
if (channel.closed && channel.elems == 0)
{
return ThreadFault.CHANNEL_CLOSED?;
}

// read from buf
Type ret = self.buf[self.readx];
Type ret = channel.buf[channel.readx];

// move pointer
self.readx++;
if (self.readx == self.size)
channel.readx++;
if (channel.readx == channel.size)
{
self.readx = 0;
channel.readx = 0;
}

// change elems counter
self.elems--;
channel.elems--;

// if someone is waiting -> awake him
if (self.send_waiting > 0)
if (channel.send_waiting > 0)
{
self.send_cond.signal()!;
channel.send_cond.signal()!;
}

self.mu.unlock()!;
channel.mu.unlock()!;

return ret;
}

fn void! BufferedChannelImpl.close(&self) @private
fn void! BufferedChannel.close(self)
{
anyfault err = @catch(self.mu.lock());
BufferedChannelImpl* channel = (BufferedChannelImpl*)self;

self.closed = true;
anyfault err = @catch(channel.mu.lock());

err = @catch(self.read_cond.broadcast()) ?: err;
err = @catch(self.send_cond.broadcast()) ?: err;
channel.closed = true;

err = @catch(self.mu.unlock()) ?: err;
err = @catch(channel.read_cond.broadcast()) ?: err;
err = @catch(channel.send_cond.broadcast()) ?: err;
err = @catch(channel.mu.unlock()) ?: err;

if (err) return err?;
}
Expand Down
Loading

0 comments on commit 1534d9b

Please sign in to comment.