Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
elijahr committed Sep 28, 2024
1 parent 3ddbdc6 commit 0203eb5
Show file tree
Hide file tree
Showing 11 changed files with 303 additions and 126 deletions.
6 changes: 3 additions & 3 deletions lockfreequeues.nimble
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ requires "nim >= 1.2.0"
# Tasks
task test, "Runs the test suite":
# C
exec "nim c -r -f tests/test.nim"
# exec "nim c -r -f tests/test.nim"

# C++
exec "nim cpp -r -f tests/test.nim"
# # C++
# exec "nim cpp -r -f tests/test.nim"

if getEnv("SANITIZE_THREADS") != "no":
# C (with thread sanitization)
Expand Down
5 changes: 5 additions & 0 deletions src/lockfreequeues/constants.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@

## Constants used by lockfreequeues

import options

# The size of a cache line (128 bytes on PowerPC, 64 bytes elsewhere)
const CacheLineBytes* {.intdefine.} = when defined(powerpc):
128
else:
64

const NoSlice* = none(HSlice[int, int])

3 changes: 0 additions & 3 deletions src/lockfreequeues/exceptions.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,3 @@ type NoProducersAvailableError* = object of QueueError ## \
type NoConsumersAvailableError* = object of QueueError ## \
## Raised by `getConsumer()` if all producers have been assigned to other
## threads.
type InvalidCallDefect* = object of Defect ## \
## Raised by `Mupsic.push()`, `Mupmuc.push()`, and `Mupmuc.pop()` because
## those should happen via `Producer.push()` or `Consumer.pop()`.
183 changes: 168 additions & 15 deletions src/lockfreequeues/mupmuc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ import ./atomic_dsl
import ./exceptions
import ./ops
import ./mupsic
import ./sipsic
import ./constants


const NoConsumerIdx* = -1 ## The initial value of `Mupmuc.prevConsumerIdx`.


type
Reservation*[Q] = object
threadId: int

Mupmuc*[N, P, C: static int, T] = object of Mupsic[N, P, T]
## A multi-producer, multi-consumer bounded queue implemented as a ring
## buffer.
Expand All @@ -36,11 +41,12 @@ type
consumerThreadIds*: array[C, Atomic[int]] ## \
## Array of consumer thread IDs by index

Consumer*[N, P, C: static int, T] = object

Consumer*[Q] = object
## A per-thread interface for popping items from a queue.
## Retrieved via a call to `Mupmuc.getConsumer()`
idx*: int ## The consumer's unique identifier.
queuePtr*: ptr Mupmuc[N, P, C, T] ## A ptr to the consumer's queue.
queuePtr*: ptr Q ## A ptr to the consumer's queue.


proc clear*[N, P, C: static int, T](
Expand Down Expand Up @@ -82,10 +88,159 @@ proc initMupmuc*[N, P, C: static int, T](): Mupmuc[N, P, C, T] =
result.consumerThreadIds[c].sequential(0)


proc push*[N, P, C: static int, T](
self: var Producer[Mupmuc[N, P, C, T]],
item: T,
): bool =
## Append a single item to the queue.
## If the queue is full, `false` is returned.
## If `item` is appended, `true` is returned.

var prevTail: int
var newTail: int
var prevProducerIdx: int
var isFirstProduction: bool

# spin until reservation is acquired
while true:
prevProducerIdx = self.queuePtr.prevProducerIdx.acquire
isFirstProduction = prevProducerIdx == NoProducerIdx
var head = self.queuePtr.head.sequential
prevTail =
if unlikely(isFirstProduction):
0
else:
self.queuePtr.producerTails[prevProducerIdx].acquire

if unlikely(full(head, prevTail, N)):
return false

newTail = incOrReset(prevTail, 1, N)
# validateHeadAndTail(head, newTail, N)
self.queuePtr.producerTails[self.idx].release(newTail)

if self.queuePtr.prevProducerIdx.compareExchangeWeak(
prevProducerIdx,
self.idx,
moRelease,
moAcquire,
):
break

result = true

self.queuePtr.storage[index(prevTail, N)] = item

if unlikely(isFirstProduction):
self.queuePtr.tail.release(newTail)
else:
# Wait for prev producer to update tail, then update tail
while true:
var expectedTail = prevTail
if self.queuePtr.tail.compareExchangeWeak(
expectedTail,
newTail,
moRelease,
moAcquire,
):
break


proc push*[N, P, C: static int, T](
self: var Producer[Mupmuc[N, P, C, T]],
items: openArray[T],
): Option[HSlice[int, int]] =
## Append multiple items to the queue.
## If the queue is already full or is filled by this call, `some(unpushed)`
## is returned, where `unpushed` is an `HSlice` corresponding to the
## chunk of items which could not be pushed.
## If all items are appended, `none(HSlice[int, int])` is returned.
if unlikely(items.len == 0):
# items is empty, nothing unpushed
return NoSlice

var count: int
var avail: int
var prevTail: int
var newTail: int
var prevProducerIdx: int
var isFirstProduction: bool

# spin until reservation is acquired
while true:
prevProducerIdx = self.queuePtr.prevProducerIdx.acquire
isFirstProduction = prevProducerIdx == NoProducerIdx
var head = self.queuePtr.head.sequential
prevTail =
if isFirstProduction:
0
else:
self.queuePtr.producerTails[prevProducerIdx].acquire

avail = available(head, prevTail, N)
if likely(avail >= items.len):
# enough room to push all items
count = items.len
else:
if avail <= 0:
# Queue is full, return
return some(0..items.len - 1)
else:
# not enough room to push all items
count = avail

newTail = incOrReset(prevTail, count, N)
# validateHeadAndTail(head, newTail, N)
self.queuePtr.producerTails[self.idx].release(newTail)

if self.queuePtr.prevProducerIdx.compareExchangeWeak(
prevProducerIdx,
self.idx,
moRelease,
moAcquire,
):
break

if count < items.len:
# give back remainder
result = some(avail..items.len - 1)
else:
result = NoSlice

let start = index(prevTail, N)
var stop = incOrReset(prevTail, count - 1, N)
stop = index(stop, N)

if start > stop:
# data may wrap
let pivot = (N-1) - start
self.queuePtr.storage[start..start+pivot] = items[0..pivot]
if stop > 0:
# data wraps
self.queuePtr.storage[0..stop] = items[pivot+1..pivot+1+stop]
else:
# data does not wrap
self.queuePtr.storage[start..stop] = items[0..stop-start]

if unlikely(isFirstProduction):
self.queuePtr.tail.release(newTail)
else:
# Wait for prev producer to update tail, then update tail
while true:
var expectedTail = prevTail
if self.queuePtr.tail.compareExchangeWeak(
expectedTail,
newTail,
moRelease,
moAcquire,
):
break


proc getConsumer*[N, P, C: static int, T](
self: var Mupmuc[N, P, C, T],
idx: int = NoConsumerIdx,
): Consumer[N, P, C, T]
): Consumer[Mupmuc[N, P, C, T]]
{.raises: [NoConsumersAvailableError].} =
## Assigns and returns a `Consumer` instance for the current thread.
result.queuePtr = addr self
Expand Down Expand Up @@ -123,7 +278,7 @@ proc getConsumer*[N, P, C: static int, T](


proc pop*[N, P, C: static int, T](
self: var Consumer[N, P, C, T],
self: var Consumer[Mupmuc[N, P, C, T]],
): Option[T] =
## Pop a single item from the queue.
## If the queue is empty, `none(T)` is returned.
Expand All @@ -147,7 +302,8 @@ proc pop*[N, P, C: static int, T](

if unlikely(empty(prevHead, tail, N)):
if prevHead != tail:
echo "empty but prevHead and tail are not equal! prevHead=" & $prevHead & " tail=" & $tail
echo "empty but prevHead and tail are not equal! prevHead=" &
$prevHead & " tail=" & $tail
return none(T)

newHead = incOrReset(prevHead, 1, N)
Expand Down Expand Up @@ -179,7 +335,7 @@ proc pop*[N, P, C: static int, T](


proc pop*[N, P, C: static int, T](
self: var Consumer[N, P, C, T],
self: var Consumer[Mupmuc[N, P, C, T]],
count: int,
): Option[seq[T]] =
## Pop `count` items from the queue.
Expand All @@ -197,6 +353,8 @@ proc pop*[N, P, C: static int, T](
var isFirstConsumption: bool
var tail: int

# hmmmmm

# spin until reservation is acquired
while true:
prevConsumerIdx = self.queuePtr.prevConsumerIdx.acquire
Expand Down Expand Up @@ -267,19 +425,13 @@ proc pop*[N, P, C: static int, T](

proc pop*[N, P, C: static int, T](
self: var Mupmuc[N, P, C, T],
): bool =
## Overload of `Sipsic.pop()` that simply raises `InvalidCallDefect`.
## Pops should happen via `Consumer.pop()`.
raise newException(InvalidCallDefect, "Use Consumer.pop()")
): bool {.error: "Use Consumer.pop()".}


proc pop*[N, P, C: static int, T](
self: var Mupmuc[N, P, C, T],
count: int,
): Option[seq[T]] =
## Overload of `Sipsic.pop()` that simply raises `InvalidCallDefect`.
## Pops should happen via `Consumer.pop()`.
raise newException(InvalidCallDefect, "Use Consumer.pop()")
): Option[seq[T]] {.error: "Use Consumer.pop()".}


proc consumerCount*[N, P, C: static int, T](
Expand All @@ -290,7 +442,8 @@ proc consumerCount*[N, P, C: static int, T](
result = C


proc `=copy`*[N, P, C: static int, T](a: var Mupmuc[N, P, C, T], b: Mupmuc[N, P, C, T]) {.error.}
proc `=copy`*[N, P, C: static int, T](a: var Mupmuc[N, P, C, T], b: Mupmuc[N, P,
C, T]) {.error.}


when defined(testing):
Expand Down
Loading

0 comments on commit 0203eb5

Please sign in to comment.