Skip to content

Commit

Permalink
Add SPSCQueue/SPSCQueueOf
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Jan 25, 2025
1 parent ac72527 commit 2ef2c05
Show file tree
Hide file tree
Showing 12 changed files with 721 additions and 151 deletions.
34 changes: 28 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,50 @@ m := NewMapOfWithHasher[int, int](func(i int, _ uint64) uint64 {

When benchmarking concurrent maps, make sure to configure all of the competitors with the same hash function or, at least, take hash function performance into the consideration.

### SPSCQueue

A `SPSCQueue` is a bounded single-producer single-consumer concurrent queue. This means that not more than a single goroutine must be publishing items to the queue while not more than a single goroutine must be consuming those items.

```go
q := xsync.NewSPSCQueue(1024)
// producer inserts an item into the queue
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue() // interface{} pointing to a string
```

`SPSCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later.

```go
q := xsync.NewSPSCQueueOf[string](1024)
inserted := q.TryEnqueue("foo")
item, ok := q.TryDequeue() // string
```

The queue is based on the data structure from this [article](https://rigtorp.se/ringbuffer). The idea is to reduce the CPU cache coherency traffic by keeping cached copies of read and write indexes used by producer and consumer respectively.

### MPMCQueue

A `MPMCQueue` is a bounded multi-producer multi-consumer concurrent queue.

```go
q := xsync.NewMPMCQueue(1024)
// producer inserts an item into the queue
q.Enqueue("foo")
// producer optimistically inserts an item into the queue
// optimistic insertion attempt; doesn't block
inserted := q.TryEnqueue("bar")
// consumer obtains an item from the queue
item := q.Dequeue() // interface{} pointing to a string
// optimistic obtain attempt; doesn't block
item, ok := q.TryDequeue()
item, ok := q.TryDequeue() // interface{} pointing to a string
```

`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later.

```go
q := xsync.NewMPMCQueueOf[string](1024)
q.Enqueue("foo")
item := q.Dequeue() // string
inserted := q.TryEnqueue("foo")
item, ok := q.TryDequeue() // string
```

The queue is based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers.
Expand Down
10 changes: 5 additions & 5 deletions map.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,11 +462,11 @@ func (m *Map) doCompute(
if b.next == nil {
if emptyb != nil {
// Insertion into an existing bucket.
var zeroedV interface{}
newValue, del := valueFn(zeroedV, false)
var zeroV interface{}
newValue, del := valueFn(zeroV, false)
if del {
unlockBucket(&rootb.topHashMutex)
return zeroedV, false
return zeroV, false
}
// First we update the value, then the key.
// This is important for atomic snapshot states.
Expand All @@ -486,8 +486,8 @@ func (m *Map) doCompute(
goto compute_attempt
}
// Insertion into a new bucket.
var zeroedV interface{}
newValue, del := valueFn(zeroedV, false)
var zeroV interface{}
newValue, del := valueFn(zeroV, false)
if del {
unlockBucket(&rootb.topHashMutex)
return newValue, false
Expand Down
10 changes: 5 additions & 5 deletions map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ func TestMapLoadOrCompute_FunctionCalledOnce(t *testing.T) {
}

func TestMapCompute(t *testing.T) {
var zeroedV interface{}
var zeroV interface{}
m := NewMap()
// Store a new value.
v, ok := m.Compute("foobar", func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool) {
if oldValue != zeroedV {
if oldValue != zeroV {
t.Fatalf("oldValue should be empty interface{} when computing a new value: %d", oldValue)
}
if loaded {
Expand Down Expand Up @@ -420,8 +420,8 @@ func TestMapCompute(t *testing.T) {
}
// Try to delete a non-existing value. Notice different key.
v, ok = m.Compute("barbaz", func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool) {
var zeroedV interface{}
if oldValue != zeroedV {
var zeroV interface{}
if oldValue != zeroV {
t.Fatalf("oldValue should be empty interface{} when trying to delete a non-existing value: %d", oldValue)
}
if loaded {
Expand All @@ -432,7 +432,7 @@ func TestMapCompute(t *testing.T) {
delete = true
return
})
if v != zeroedV {
if v != zeroV {
t.Fatalf("v should be empty interface{} when trying to delete a non-existing value: %d", v)
}
if ok {
Expand Down
10 changes: 5 additions & 5 deletions mapof.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ func (m *MapOf[K, V]) doCompute(
if b.next == nil {
if emptyb != nil {
// Insertion into an existing bucket.
var zeroedV V
newValue, del := valueFn(zeroedV, false)
var zeroV V
newValue, del := valueFn(zeroV, false)
if del {
rootb.mu.Unlock()
return zeroedV, false
return zeroV, false
}
newe := new(entryOf[K, V])
newe.key = key
Expand All @@ -429,8 +429,8 @@ func (m *MapOf[K, V]) doCompute(
goto compute_attempt
}
// Insertion into a new bucket.
var zeroedV V
newValue, del := valueFn(zeroedV, false)
var zeroV V
newValue, del := valueFn(zeroV, false)
if del {
rootb.mu.Unlock()
return newValue, false
Expand Down
56 changes: 22 additions & 34 deletions mpmcqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func NewMPMCQueue(capacity int) *MPMCQueue {

// Enqueue inserts the given item into the queue.
// Blocks, if the queue is full.
//
// Deprecated: use TryEnqueue in combination with runtime.Gosched().
func (q *MPMCQueue) Enqueue(item interface{}) {
head := atomic.AddUint64(&q.head, 1) - 1
slot := &q.slots[q.idx(head)]
Expand All @@ -63,6 +65,8 @@ func (q *MPMCQueue) Enqueue(item interface{}) {

// Dequeue retrieves and removes the item from the head of the queue.
// Blocks, if the queue is empty.
//
// Deprecated: use TryDequeue in combination with runtime.Gosched().
func (q *MPMCQueue) Dequeue() interface{} {
tail := atomic.AddUint64(&q.tail, 1) - 1
slot := &q.slots[q.idx(tail)]
Expand All @@ -81,51 +85,35 @@ func (q *MPMCQueue) Dequeue() interface{} {
// full and the item was inserted.
func (q *MPMCQueue) TryEnqueue(item interface{}) bool {
head := atomic.LoadUint64(&q.head)
for {
slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
if atomic.LoadUint64(&slot.turn) == turn {
if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
slot.item = item
atomic.StoreUint64(&slot.turn, turn+1)
return true
}
} else {
prevHead := head
head = atomic.LoadUint64(&q.head)
if head == prevHead {
return false
}
slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
if atomic.LoadUint64(&slot.turn) == turn {
if atomic.CompareAndSwapUint64(&q.head, head, head+1) {
slot.item = item
atomic.StoreUint64(&slot.turn, turn+1)
return true
}
runtime.Gosched()
}
return false
}

// TryDequeue retrieves and removes the item from the head of the
// queue. Does not block and returns immediately. The ok result
// indicates that the queue isn't empty and an item was retrieved.
func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) {
tail := atomic.LoadUint64(&q.tail)
for {
slot := &q.slots[q.idx(tail)]
turn := q.turn(tail)*2 + 1
if atomic.LoadUint64(&slot.turn) == turn {
if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
item = slot.item
ok = true
slot.item = nil
atomic.StoreUint64(&slot.turn, turn+1)
return
}
} else {
prevTail := tail
tail = atomic.LoadUint64(&q.tail)
if tail == prevTail {
return
}
slot := &q.slots[q.idx(tail)]
turn := q.turn(tail)*2 + 1
if atomic.LoadUint64(&slot.turn) == turn {
if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) {
item = slot.item
ok = true
slot.item = nil
atomic.StoreUint64(&slot.turn, turn+1)
return
}
runtime.Gosched()
}
return
}

func (q *MPMCQueue) idx(i uint64) uint64 {
Expand Down
60 changes: 30 additions & 30 deletions mpmcqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ import (
. "github.com/puzpuzpuz/xsync/v3"
)

func TestQueue_InvalidSize(t *testing.T) {
func TestMPMCQueue_InvalidSize(t *testing.T) {
defer func() { recover() }()
NewMPMCQueue(0)
t.Fatal("no panic detected")
}

func TestQueueEnqueueDequeue(t *testing.T) {
func TestMPMCQueueEnqueueDequeue(t *testing.T) {
q := NewMPMCQueue(10)
for i := 0; i < 10; i++ {
q.Enqueue(i)
Expand All @@ -32,7 +32,7 @@ func TestQueueEnqueueDequeue(t *testing.T) {
}
}

func TestQueueEnqueueBlocksOnFull(t *testing.T) {
func TestMPMCQueueEnqueueBlocksOnFull(t *testing.T) {
q := NewMPMCQueue(1)
q.Enqueue("foo")
cdone := make(chan bool)
Expand All @@ -52,7 +52,7 @@ func TestQueueEnqueueBlocksOnFull(t *testing.T) {
<-cdone
}

func TestQueueDequeueBlocksOnEmpty(t *testing.T) {
func TestMPMCQueueDequeueBlocksOnEmpty(t *testing.T) {
q := NewMPMCQueue(2)
cdone := make(chan bool)
flag := int32(0)
Expand All @@ -69,7 +69,7 @@ func TestQueueDequeueBlocksOnEmpty(t *testing.T) {
<-cdone
}

func TestQueueTryEnqueueDequeue(t *testing.T) {
func TestMPMCQueueTryEnqueueDequeue(t *testing.T) {
q := NewMPMCQueue(10)
for i := 0; i < 10; i++ {
if !q.TryEnqueue(i) {
Expand All @@ -83,7 +83,7 @@ func TestQueueTryEnqueueDequeue(t *testing.T) {
}
}

func TestQueueTryEnqueueOnFull(t *testing.T) {
func TestMPMCQueueTryEnqueueOnFull(t *testing.T) {
q := NewMPMCQueue(1)
if !q.TryEnqueue("foo") {
t.Error("failed to enqueue initial item")
Expand All @@ -93,14 +93,14 @@ func TestQueueTryEnqueueOnFull(t *testing.T) {
}
}

func TestQueueTryDequeueBlocksOnEmpty(t *testing.T) {
func TestMPMCQueueTryDequeueOnEmpty(t *testing.T) {
q := NewMPMCQueue(2)
if _, ok := q.TryDequeue(); ok {
t.Error("got success for enqueue on empty queue")
}
}

func hammerQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) {
func hammerMPMCQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) {
runtime.GOMAXPROCS(gomaxprocs)
q := NewMPMCQueue(numThreads)
startwg := sync.WaitGroup{}
Expand Down Expand Up @@ -142,21 +142,21 @@ func hammerQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int)
}
}

func TestQueueBlockingCalls(t *testing.T) {
func TestMPMCQueueBlockingCalls(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 100
if testing.Short() {
n = 10
}
hammerQueueBlockingCalls(t, 1, 100*n, n)
hammerQueueBlockingCalls(t, 1, 1000*n, 10*n)
hammerQueueBlockingCalls(t, 4, 100*n, n)
hammerQueueBlockingCalls(t, 4, 1000*n, 10*n)
hammerQueueBlockingCalls(t, 8, 100*n, n)
hammerQueueBlockingCalls(t, 8, 1000*n, 10*n)
hammerMPMCQueueBlockingCalls(t, 1, 100*n, n)
hammerMPMCQueueBlockingCalls(t, 1, 1000*n, 10*n)
hammerMPMCQueueBlockingCalls(t, 4, 100*n, n)
hammerMPMCQueueBlockingCalls(t, 4, 1000*n, 10*n)
hammerMPMCQueueBlockingCalls(t, 8, 100*n, n)
hammerMPMCQueueBlockingCalls(t, 8, 1000*n, 10*n)
}

func hammerQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) {
func hammerMPMCQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) {
runtime.GOMAXPROCS(gomaxprocs)
q := NewMPMCQueue(numThreads)
startwg := sync.WaitGroup{}
Expand Down Expand Up @@ -209,18 +209,18 @@ func hammerQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads in
}
}

func TestQueueNonBlockingCalls(t *testing.T) {
func TestMPMCQueueNonBlockingCalls(t *testing.T) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 10
if testing.Short() {
n = 1
}
hammerQueueNonBlockingCalls(t, 1, n, n)
hammerQueueNonBlockingCalls(t, 2, 10*n, 2*n)
hammerQueueNonBlockingCalls(t, 4, 100*n, 4*n)
hammerMPMCQueueNonBlockingCalls(t, 1, n, n)
hammerMPMCQueueNonBlockingCalls(t, 2, 10*n, 2*n)
hammerMPMCQueueNonBlockingCalls(t, 4, 100*n, 4*n)
}

func benchmarkQueueProdCons(b *testing.B, queueSize, localWork int) {
func benchmarkMPMCQueue(b *testing.B, queueSize, localWork int) {
callsPerSched := queueSize
procs := runtime.GOMAXPROCS(-1) / 2
if procs == 0 {
Expand Down Expand Up @@ -265,15 +265,15 @@ func benchmarkQueueProdCons(b *testing.B, queueSize, localWork int) {
}
}

func BenchmarkQueueProdCons(b *testing.B) {
benchmarkQueueProdCons(b, 1000, 0)
func BenchmarkMPMCQueue(b *testing.B) {
benchmarkMPMCQueue(b, 1000, 0)
}

func BenchmarkQueueProdConsWork100(b *testing.B) {
benchmarkQueueProdCons(b, 1000, 100)
func BenchmarkMPMCQueueWork100(b *testing.B) {
benchmarkMPMCQueue(b, 1000, 100)
}

func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
func benchmarkMPMCChan(b *testing.B, chanSize, localWork int) {
callsPerSched := chanSize
procs := runtime.GOMAXPROCS(-1) / 2
if procs == 0 {
Expand Down Expand Up @@ -318,10 +318,10 @@ func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) {
}
}

func BenchmarkChanProdCons(b *testing.B) {
benchmarkChanProdCons(b, 1000, 0)
func BenchmarkMPMCChan(b *testing.B) {
benchmarkMPMCChan(b, 1000, 0)
}

func BenchmarkChanProdConsWork100(b *testing.B) {
benchmarkChanProdCons(b, 1000, 100)
func BenchmarkMPMCChanWork100(b *testing.B) {
benchmarkMPMCChan(b, 1000, 100)
}
Loading

0 comments on commit 2ef2c05

Please sign in to comment.