From 0203eb5aab2d1d763633df33b274319fcf4cece7 Mon Sep 17 00:00:00 2001 From: elijahr Date: Sat, 28 Sep 2024 14:58:57 -0500 Subject: [PATCH] wip --- lockfreequeues.nimble | 6 +- src/lockfreequeues/constants.nim | 5 + src/lockfreequeues/exceptions.nim | 3 - src/lockfreequeues/mupmuc.nim | 183 +++++++++++++++++++++++++++--- src/lockfreequeues/mupsic.nim | 125 ++++++++++++++------ src/lockfreequeues/ops.nim | 23 ++-- src/lockfreequeues/sipsic.nim | 38 +------ src/lockfreequeues/state.nim | 15 +++ tests/t_mupmuc.nim | 13 --- tests/t_mupsic.nim | 13 --- tests/utils.nim | 5 + 11 files changed, 303 insertions(+), 126 deletions(-) create mode 100644 src/lockfreequeues/state.nim create mode 100644 tests/utils.nim diff --git a/lockfreequeues.nimble b/lockfreequeues.nimble index 8232ef8..6733ac2 100644 --- a/lockfreequeues.nimble +++ b/lockfreequeues.nimble @@ -13,10 +13,10 @@ requires "nim >= 1.2.0" # Tasks task test, "Runs the test suite": # C - exec "nim c -r -f tests/test.nim" + # exec "nim c -r -f tests/test.nim" - # C++ - exec "nim cpp -r -f tests/test.nim" + # # C++ + # exec "nim cpp -r -f tests/test.nim" if getEnv("SANITIZE_THREADS") != "no": # C (with thread sanitization) diff --git a/src/lockfreequeues/constants.nim b/src/lockfreequeues/constants.nim index a7ce911..122a8a7 100644 --- a/src/lockfreequeues/constants.nim +++ b/src/lockfreequeues/constants.nim @@ -6,8 +6,13 @@ ## Constants used by lockfreequeues +import options + # The size of a cache line (128 bytes on PowerPC, 64 bytes elsewhere) const CacheLineBytes* {.intdefine.} = when defined(powerpc): 128 else: 64 + +const NoSlice* = none(HSlice[int, int]) + diff --git a/src/lockfreequeues/exceptions.nim b/src/lockfreequeues/exceptions.nim index 3fb2dd0..bc00333 100644 --- a/src/lockfreequeues/exceptions.nim +++ b/src/lockfreequeues/exceptions.nim @@ -8,6 +8,3 @@ type NoProducersAvailableError* = object of QueueError ## \ type NoConsumersAvailableError* = object of QueueError ## \ ## Raised by `getConsumer()` if all producers have been assigned to other ## threads. -type InvalidCallDefect* = object of Defect ## \ - ## Raised by `Mupsic.push()`, `Mupmuc.push()`, and `Mupmuc.pop()` because - ## those should happen via `Producer.push()` or `Consumer.pop()`. diff --git a/src/lockfreequeues/mupmuc.nim b/src/lockfreequeues/mupmuc.nim index 6363534..28bedcd 100644 --- a/src/lockfreequeues/mupmuc.nim +++ b/src/lockfreequeues/mupmuc.nim @@ -16,12 +16,17 @@ import ./atomic_dsl import ./exceptions import ./ops import ./mupsic +import ./sipsic +import ./constants const NoConsumerIdx* = -1 ## The initial value of `Mupmuc.prevConsumerIdx`. type + Reservation*[Q] = object + threadId: int + Mupmuc*[N, P, C: static int, T] = object of Mupsic[N, P, T] ## A multi-producer, multi-consumer bounded queue implemented as a ring ## buffer. @@ -36,11 +41,12 @@ type consumerThreadIds*: array[C, Atomic[int]] ## \ ## Array of consumer thread IDs by index - Consumer*[N, P, C: static int, T] = object + + Consumer*[Q] = object ## A per-thread interface for popping items from a queue. ## Retrieved via a call to `Mupmuc.getConsumer()` idx*: int ## The consumer's unique identifier. - queuePtr*: ptr Mupmuc[N, P, C, T] ## A ptr to the consumer's queue. + queuePtr*: ptr Q ## A ptr to the consumer's queue. proc clear*[N, P, C: static int, T]( @@ -82,10 +88,159 @@ proc initMupmuc*[N, P, C: static int, T](): Mupmuc[N, P, C, T] = result.consumerThreadIds[c].sequential(0) +proc push*[N, P, C: static int, T]( + self: var Producer[Mupmuc[N, P, C, T]], + item: T, +): bool = + ## Append a single item to the queue. + ## If the queue is full, `false` is returned. + ## If `item` is appended, `true` is returned. + + var prevTail: int + var newTail: int + var prevProducerIdx: int + var isFirstProduction: bool + + # spin until reservation is acquired + while true: + prevProducerIdx = self.queuePtr.prevProducerIdx.acquire + isFirstProduction = prevProducerIdx == NoProducerIdx + var head = self.queuePtr.head.sequential + prevTail = + if unlikely(isFirstProduction): + 0 + else: + self.queuePtr.producerTails[prevProducerIdx].acquire + + if unlikely(full(head, prevTail, N)): + return false + + newTail = incOrReset(prevTail, 1, N) + # validateHeadAndTail(head, newTail, N) + self.queuePtr.producerTails[self.idx].release(newTail) + + if self.queuePtr.prevProducerIdx.compareExchangeWeak( + prevProducerIdx, + self.idx, + moRelease, + moAcquire, + ): + break + + result = true + + self.queuePtr.storage[index(prevTail, N)] = item + + if unlikely(isFirstProduction): + self.queuePtr.tail.release(newTail) + else: + # Wait for prev producer to update tail, then update tail + while true: + var expectedTail = prevTail + if self.queuePtr.tail.compareExchangeWeak( + expectedTail, + newTail, + moRelease, + moAcquire, + ): + break + + +proc push*[N, P, C: static int, T]( + self: var Producer[Mupmuc[N, P, C, T]], + items: openArray[T], +): Option[HSlice[int, int]] = + ## Append multiple items to the queue. + ## If the queue is already full or is filled by this call, `some(unpushed)` + ## is returned, where `unpushed` is an `HSlice` corresponding to the + ## chunk of items which could not be pushed. + ## If all items are appended, `none(HSlice[int, int])` is returned. + if unlikely(items.len == 0): + # items is empty, nothing unpushed + return NoSlice + + var count: int + var avail: int + var prevTail: int + var newTail: int + var prevProducerIdx: int + var isFirstProduction: bool + + # spin until reservation is acquired + while true: + prevProducerIdx = self.queuePtr.prevProducerIdx.acquire + isFirstProduction = prevProducerIdx == NoProducerIdx + var head = self.queuePtr.head.sequential + prevTail = + if isFirstProduction: + 0 + else: + self.queuePtr.producerTails[prevProducerIdx].acquire + + avail = available(head, prevTail, N) + if likely(avail >= items.len): + # enough room to push all items + count = items.len + else: + if avail <= 0: + # Queue is full, return + return some(0..items.len - 1) + else: + # not enough room to push all items + count = avail + + newTail = incOrReset(prevTail, count, N) + # validateHeadAndTail(head, newTail, N) + self.queuePtr.producerTails[self.idx].release(newTail) + + if self.queuePtr.prevProducerIdx.compareExchangeWeak( + prevProducerIdx, + self.idx, + moRelease, + moAcquire, + ): + break + + if count < items.len: + # give back remainder + result = some(avail..items.len - 1) + else: + result = NoSlice + + let start = index(prevTail, N) + var stop = incOrReset(prevTail, count - 1, N) + stop = index(stop, N) + + if start > stop: + # data may wrap + let pivot = (N-1) - start + self.queuePtr.storage[start..start+pivot] = items[0..pivot] + if stop > 0: + # data wraps + self.queuePtr.storage[0..stop] = items[pivot+1..pivot+1+stop] + else: + # data does not wrap + self.queuePtr.storage[start..stop] = items[0..stop-start] + + if unlikely(isFirstProduction): + self.queuePtr.tail.release(newTail) + else: + # Wait for prev producer to update tail, then update tail + while true: + var expectedTail = prevTail + if self.queuePtr.tail.compareExchangeWeak( + expectedTail, + newTail, + moRelease, + moAcquire, + ): + break + + proc getConsumer*[N, P, C: static int, T]( self: var Mupmuc[N, P, C, T], idx: int = NoConsumerIdx, -): Consumer[N, P, C, T] +): Consumer[Mupmuc[N, P, C, T]] {.raises: [NoConsumersAvailableError].} = ## Assigns and returns a `Consumer` instance for the current thread. result.queuePtr = addr self @@ -123,7 +278,7 @@ proc getConsumer*[N, P, C: static int, T]( proc pop*[N, P, C: static int, T]( - self: var Consumer[N, P, C, T], + self: var Consumer[Mupmuc[N, P, C, T]], ): Option[T] = ## Pop a single item from the queue. ## If the queue is empty, `none(T)` is returned. @@ -147,7 +302,8 @@ proc pop*[N, P, C: static int, T]( if unlikely(empty(prevHead, tail, N)): if prevHead != tail: - echo "empty but prevHead and tail are not equal! prevHead=" & $prevHead & " tail=" & $tail + echo "empty but prevHead and tail are not equal! prevHead=" & + $prevHead & " tail=" & $tail return none(T) newHead = incOrReset(prevHead, 1, N) @@ -179,7 +335,7 @@ proc pop*[N, P, C: static int, T]( proc pop*[N, P, C: static int, T]( - self: var Consumer[N, P, C, T], + self: var Consumer[Mupmuc[N, P, C, T]], count: int, ): Option[seq[T]] = ## Pop `count` items from the queue. @@ -197,6 +353,8 @@ proc pop*[N, P, C: static int, T]( var isFirstConsumption: bool var tail: int + # hmmmmm + # spin until reservation is acquired while true: prevConsumerIdx = self.queuePtr.prevConsumerIdx.acquire @@ -267,19 +425,13 @@ proc pop*[N, P, C: static int, T]( proc pop*[N, P, C: static int, T]( self: var Mupmuc[N, P, C, T], -): bool = - ## Overload of `Sipsic.pop()` that simply raises `InvalidCallDefect`. - ## Pops should happen via `Consumer.pop()`. - raise newException(InvalidCallDefect, "Use Consumer.pop()") +): bool {.error: "Use Consumer.pop()".} proc pop*[N, P, C: static int, T]( self: var Mupmuc[N, P, C, T], count: int, -): Option[seq[T]] = - ## Overload of `Sipsic.pop()` that simply raises `InvalidCallDefect`. - ## Pops should happen via `Consumer.pop()`. - raise newException(InvalidCallDefect, "Use Consumer.pop()") +): Option[seq[T]] {.error: "Use Consumer.pop()".} proc consumerCount*[N, P, C: static int, T]( @@ -290,7 +442,8 @@ proc consumerCount*[N, P, C: static int, T]( result = C -proc `=copy`*[N, P, C: static int, T](a: var Mupmuc[N, P, C, T], b: Mupmuc[N, P, C, T]) {.error.} +proc `=copy`*[N, P, C: static int, T](a: var Mupmuc[N, P, C, T], b: Mupmuc[N, P, + C, T]) {.error.} when defined(testing): diff --git a/src/lockfreequeues/mupsic.nim b/src/lockfreequeues/mupsic.nim index 2daaf0a..4f651f1 100644 --- a/src/lockfreequeues/mupsic.nim +++ b/src/lockfreequeues/mupsic.nim @@ -15,12 +15,11 @@ import options import ./atomic_dsl import ./exceptions import ./ops -import ./sipsic const NoProducerIdx* = -1 ## The initial value of `Mupsic.prevProducerIdx`. type - Mupsic*[N, P: static int, T] = object of Sipsic[N, T] + Mupsic*[N, P: static int, T] = object of RootObj ## A multi-producer, single-consumer bounded queue implemented as a ring ## buffer. Popping is wait-free. ## @@ -28,27 +27,27 @@ type ## * `P` is the number of producer threads. ## * `T` is the type of data the queue will hold. - prevProducerIdx*: Atomic[int] ## The ID (index) of the most recent producer - producerTails*: array[P, Atomic[int]] ## Array of producer tails + head*: Atomic[int] + prevProducerIdx*: Atomic[int] ## The ID (index) of the most recent producer. + producerTails*: array[P, Atomic[int]] ## Array of producer tails. producerThreadIds*: array[P, Atomic[int]] ## \ - ## Array of producer thread IDs by index + ## Array of producer thread IDs by index. producerReleases*: array[P, Atomic[bool]] ## \ - ## Array indicating whether producers have been released by release() + ## Array indicating whether producers have been released by release(). - # MupsicRef*[N, P: static int, T] = ref Mupsic[N, P, T] + storage*: array[N, T] ## The underlying storage. - Producer*[N, P: static int, T] = object + Producer*[Q] = object ## A per-thread interface for pushing items to a queue. - ## Retrieved via a call to `Mupsic.getProducer()` + ## Retrieved via a call to `Mupmuc.getProducer()` idx*: int ## The producer's unique identifier. - queuePtr*: ptr Mupsic[N, P, T] ## A pointer to the producer's queue. + queuePtr*: ptr Q ## A pointer to the producer's queue. proc clear*[N, P: static int, T]( self: var Mupsic[N, P, T] ) = self.head.sequential(0) - self.tail.sequential(0) for n in 0..= count): + # Enough items to fulfill request + actualCount = count + elif used <= 0: + # Queue is empty, return nothing + return none(seq[T]) + else: + # Not enough items to fulfill request + actualCount = min(used, N) + + var res = newSeq[T](actualCount) + let headIndex = index(head, N) + let newHead = incOrReset(head, actualCount, N) + + let newHeadIndex = index(newHead, N) + + if headIndex < newHeadIndex: + # request does not wrap + for i in 0.. 0: + # request wraps + for j in 0..= 2*capacity): - raise newException(QueueIndexError, "value=" & $value & " must be > 0 and <= 2*" & $capacity) + raise newException(QueueIndexError, "value=" & $value & + " must be > 0 and <= 2*" & $capacity) elif value >= capacity: result = value - capacity else: @@ -30,9 +31,11 @@ proc incOrReset*( ## increment `original` by `amount`, or reset from zero if ## `original + amount >= 2 * capacity`. if unlikely(original < 0 or original >= 2*capacity): - raise newException(QueueIndexError, "original=" & $original & " must be > 0 and <= 2*" & $capacity) + raise newException(QueueIndexError, "original=" & $original & + " must be > 0 and <= 2*" & $capacity) elif unlikely(amount notin 0..capacity): - raise newException(QueueIndexError, "amount=" & $amount & " must be > 0 and < " & $capacity) + raise newException(QueueIndexError, "amount=" & $amount & + " must be > 0 and < " & $capacity) result = original + amount if unlikely(result >= 2 * capacity): result -= 2 * capacity @@ -56,9 +59,11 @@ proc used*( if unlikely(capacity < 0): raise newException(QueueIndexError, "capacity=" & $capacity & " must be > 0") elif unlikely(head < 0 or head >= 2*capacity): - raise newException(QueueIndexError, "head=" & $head & " must be > 0 and <= 2*" & $capacity) + raise newException(QueueIndexError, "head=" & $head & + " must be > 0 and <= 2*" & $capacity) elif unlikely(tail < 0 or tail >= 2*capacity): - raise newException(QueueIndexError, "tail=" & $tail & " must be > 0 and <= 2*" & $capacity) + raise newException(QueueIndexError, "tail=" & $tail & + " must be > 0 and <= 2*" & $capacity) if tail >= capacity: if head >= capacity: @@ -71,7 +76,11 @@ proc used*( else: result = tail - head if unlikely(result < 0 or result > capacity): - raise newException(QueueIndexError, "result=" & $result & " must be <= capacity=" & $capacity) + raise newException( + QueueIndexError, + "result=" & $result & " must be > 0 and <= " & $capacity & + " (head=" & $head & ", tail=" & $tail & ")", + ) proc available*( diff --git a/src/lockfreequeues/sipsic.nim b/src/lockfreequeues/sipsic.nim index 96543ee..0c42973 100644 --- a/src/lockfreequeues/sipsic.nim +++ b/src/lockfreequeues/sipsic.nim @@ -13,9 +13,7 @@ import options import ./constants import ./ops import ./atomic_dsl - - -const NoSlice* = none(HSlice[int, int]) +import ./state type @@ -136,19 +134,6 @@ proc push*[N: static int, T]( self.tail.release(newTail) -# proc push*[N: static int, T]( -# self: ref Sipsic[N, T], -# items: openArray[T], -# ): Option[HSlice[int, int]] = -# ## Append multiple items to the queue. -# ## If the queue is already full or is filled by this call, `some(unpushed)` -# ## is returned, where `unpushed` is an `HSlice` corresponding to the -# ## chunk of items which could not be pushed. -# ## If all items are appended, `NoSlice` is returned. -# var self = self -# result = self.push(items) - - proc pop*[N: static int, T]( self: var Sipsic[N, T], ): Option[T] = @@ -170,16 +155,6 @@ proc pop*[N: static int, T]( self.head.release(newHead) -# proc pop*[N: static int, T]( -# self: ref Sipsic[N, T], -# ): Option[T] = -# ## Pop a single item from the queue. -# ## If the queue is empty, `none(T)` is returned. -# ## Otherwise an item is popped, `some(T)` is returned. -# var self = self -# result = self.pop() - - proc pop*[N: static int, T]( self: var Sipsic[N, T], count: int, @@ -230,17 +205,6 @@ proc pop*[N: static int, T]( self.head.release(newHead) -proc pop*[N: static int, T]( - self: ref Sipsic[N, T], - count: int, -): Option[seq[T]] = - ## Pop `count` items from the queue. - ## If the queue is empty, `none(seq[T])` is returned. - ## Otherwise `some(seq[T])` is returned containing at least one item. - var self = self - result = self.pop(count) - - proc capacity*[N: static int, T]( self: var Sipsic[N, T], ): int diff --git a/src/lockfreequeues/state.nim b/src/lockfreequeues/state.nim new file mode 100644 index 0000000..6ca000a --- /dev/null +++ b/src/lockfreequeues/state.nim @@ -0,0 +1,15 @@ +# lockfreequeues +# © Copyright 2020 Elijah Shaw-Rutschman +# +# See the file "LICENSE", included in this distribution for details about the +# copyright. + +## A single-producer, single-consumer bounded queue implemented as a ring +## buffer. + + +type + State* = object + ## An object which holds the head and tail indices for a queue. + head*: int + tail*: int diff --git a/tests/t_mupmuc.nim b/tests/t_mupmuc.nim index cdba22d..5929114 100644 --- a/tests/t_mupmuc.nim +++ b/tests/t_mupmuc.nim @@ -54,19 +54,6 @@ suite "getProducer(Mupmuc[N, P, C, T])": testMupGetProducerThrowsNoProducersAvailable(queue) -suite "push(Mupmuc[N, P, C, T])": - setup: - queue.reset() - - test "seq[T] should fail": - expect InvalidCallDefect: - discard queue.push(1) - - test "T should fail": - expect InvalidCallDefect: - discard queue.push(@[1]) - - suite "push(Producer[N, P, T], T)": setup: queue.reset() diff --git a/tests/t_mupsic.nim b/tests/t_mupsic.nim index bf63959..8147efc 100644 --- a/tests/t_mupsic.nim +++ b/tests/t_mupsic.nim @@ -53,19 +53,6 @@ suite "getProducer(Mupsic[N, P, T])": testMupGetProducerThrowsNoProducersAvailable(queue) -suite "push(Mupsic[N, P, T])": - setup: - queue.reset() - - test "seq[T] should fail": - expect InvalidCallDefect: - discard queue.push(1) - - test "T should fail": - expect InvalidCallDefect: - discard queue.push(@[1]) - - suite "push(Producer[N, P, T], T)": setup: queue.reset() diff --git a/tests/utils.nim b/tests/utils.nim new file mode 100644 index 0000000..e880b24 --- /dev/null +++ b/tests/utils.nim @@ -0,0 +1,5 @@ + + + +template log*(msg: string) = + echo "[" & $getThreadId() & "]: " & msg