diff --git a/README.md b/README.md index dac831b..3971553 100644 --- a/README.md +++ b/README.md @@ -100,28 +100,50 @@ m := NewMapOfWithHasher[int, int](func(i int, _ uint64) uint64 { When benchmarking concurrent maps, make sure to configure all of the competitors with the same hash function or, at least, take hash function performance into the consideration. +### SPSCQueue + +A `SPSCQueue` is a bounded single-producer single-consumer concurrent queue. This means that not more than a single goroutine must be publishing items to the queue while not more than a single goroutine must be consuming those items. + +```go +q := xsync.NewSPSCQueue(1024) +// producer inserts an item into the queue +// optimistic insertion attempt; doesn't block +inserted := q.TryEnqueue("bar") +// consumer obtains an item from the queue +// optimistic obtain attempt; doesn't block +item, ok := q.TryDequeue() // interface{} pointing to a string +``` + +`SPSCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later. + +```go +q := xsync.NewSPSCQueueOf[string](1024) +inserted := q.TryEnqueue("foo") +item, ok := q.TryDequeue() // string +``` + +The queue is based on the data structure from this [article](https://rigtorp.se/ringbuffer). The idea is to reduce the CPU cache coherency traffic by keeping cached copies of read and write indexes used by producer and consumer respectively. + ### MPMCQueue A `MPMCQueue` is a bounded multi-producer multi-consumer concurrent queue. ```go q := xsync.NewMPMCQueue(1024) -// producer inserts an item into the queue -q.Enqueue("foo") +// producer optimistically inserts an item into the queue // optimistic insertion attempt; doesn't block inserted := q.TryEnqueue("bar") // consumer obtains an item from the queue -item := q.Dequeue() // interface{} pointing to a string // optimistic obtain attempt; doesn't block -item, ok := q.TryDequeue() +item, ok := q.TryDequeue() // interface{} pointing to a string ``` `MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later. ```go q := xsync.NewMPMCQueueOf[string](1024) -q.Enqueue("foo") -item := q.Dequeue() // string +inserted := q.TryEnqueue("foo") +item, ok := q.TryDequeue() // string ``` The queue is based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. diff --git a/map.go b/map.go index 092aa0b..eee9fcb 100644 --- a/map.go +++ b/map.go @@ -462,11 +462,11 @@ func (m *Map) doCompute( if b.next == nil { if emptyb != nil { // Insertion into an existing bucket. - var zeroedV interface{} - newValue, del := valueFn(zeroedV, false) + var zeroV interface{} + newValue, del := valueFn(zeroV, false) if del { unlockBucket(&rootb.topHashMutex) - return zeroedV, false + return zeroV, false } // First we update the value, then the key. // This is important for atomic snapshot states. @@ -486,8 +486,8 @@ func (m *Map) doCompute( goto compute_attempt } // Insertion into a new bucket. - var zeroedV interface{} - newValue, del := valueFn(zeroedV, false) + var zeroV interface{} + newValue, del := valueFn(zeroV, false) if del { unlockBucket(&rootb.topHashMutex) return newValue, false diff --git a/map_test.go b/map_test.go index 364d3b8..749f76d 100644 --- a/map_test.go +++ b/map_test.go @@ -363,11 +363,11 @@ func TestMapLoadOrCompute_FunctionCalledOnce(t *testing.T) { } func TestMapCompute(t *testing.T) { - var zeroedV interface{} + var zeroV interface{} m := NewMap() // Store a new value. v, ok := m.Compute("foobar", func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool) { - if oldValue != zeroedV { + if oldValue != zeroV { t.Fatalf("oldValue should be empty interface{} when computing a new value: %d", oldValue) } if loaded { @@ -420,8 +420,8 @@ func TestMapCompute(t *testing.T) { } // Try to delete a non-existing value. Notice different key. v, ok = m.Compute("barbaz", func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool) { - var zeroedV interface{} - if oldValue != zeroedV { + var zeroV interface{} + if oldValue != zeroV { t.Fatalf("oldValue should be empty interface{} when trying to delete a non-existing value: %d", oldValue) } if loaded { @@ -432,7 +432,7 @@ func TestMapCompute(t *testing.T) { delete = true return }) - if v != zeroedV { + if v != zeroV { t.Fatalf("v should be empty interface{} when trying to delete a non-existing value: %d", v) } if ok { diff --git a/mapof.go b/mapof.go index 9d8105e..39a1aa5 100644 --- a/mapof.go +++ b/mapof.go @@ -405,11 +405,11 @@ func (m *MapOf[K, V]) doCompute( if b.next == nil { if emptyb != nil { // Insertion into an existing bucket. - var zeroedV V - newValue, del := valueFn(zeroedV, false) + var zeroV V + newValue, del := valueFn(zeroV, false) if del { rootb.mu.Unlock() - return zeroedV, false + return zeroV, false } newe := new(entryOf[K, V]) newe.key = key @@ -429,8 +429,8 @@ func (m *MapOf[K, V]) doCompute( goto compute_attempt } // Insertion into a new bucket. - var zeroedV V - newValue, del := valueFn(zeroedV, false) + var zeroV V + newValue, del := valueFn(zeroV, false) if del { rootb.mu.Unlock() return newValue, false diff --git a/mpmcqueue.go b/mpmcqueue.go index 96584e6..c5fd262 100644 --- a/mpmcqueue.go +++ b/mpmcqueue.go @@ -50,6 +50,8 @@ func NewMPMCQueue(capacity int) *MPMCQueue { // Enqueue inserts the given item into the queue. // Blocks, if the queue is full. +// +// Deprecated: use TryEnqueue in combination with runtime.Gosched(). func (q *MPMCQueue) Enqueue(item interface{}) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] @@ -63,6 +65,8 @@ func (q *MPMCQueue) Enqueue(item interface{}) { // Dequeue retrieves and removes the item from the head of the queue. // Blocks, if the queue is empty. +// +// Deprecated: use TryDequeue in combination with runtime.Gosched(). func (q *MPMCQueue) Dequeue() interface{} { tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] @@ -81,24 +85,16 @@ func (q *MPMCQueue) Dequeue() interface{} { // full and the item was inserted. func (q *MPMCQueue) TryEnqueue(item interface{}) bool { head := atomic.LoadUint64(&q.head) - for { - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - if atomic.LoadUint64(&slot.turn) == turn { - if atomic.CompareAndSwapUint64(&q.head, head, head+1) { - slot.item = item - atomic.StoreUint64(&slot.turn, turn+1) - return true - } - } else { - prevHead := head - head = atomic.LoadUint64(&q.head) - if head == prevHead { - return false - } + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + atomic.StoreUint64(&slot.turn, turn+1) + return true } - runtime.Gosched() } + return false } // TryDequeue retrieves and removes the item from the head of the @@ -106,26 +102,18 @@ func (q *MPMCQueue) TryEnqueue(item interface{}) bool { // indicates that the queue isn't empty and an item was retrieved. func (q *MPMCQueue) TryDequeue() (item interface{}, ok bool) { tail := atomic.LoadUint64(&q.tail) - for { - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - if atomic.LoadUint64(&slot.turn) == turn { - if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { - item = slot.item - ok = true - slot.item = nil - atomic.StoreUint64(&slot.turn, turn+1) - return - } - } else { - prevTail := tail - tail = atomic.LoadUint64(&q.tail) - if tail == prevTail { - return - } + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if atomic.LoadUint64(&slot.turn) == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + item = slot.item + ok = true + slot.item = nil + atomic.StoreUint64(&slot.turn, turn+1) + return } - runtime.Gosched() } + return } func (q *MPMCQueue) idx(i uint64) uint64 { diff --git a/mpmcqueue_test.go b/mpmcqueue_test.go index 2d3cb09..69a043e 100644 --- a/mpmcqueue_test.go +++ b/mpmcqueue_test.go @@ -14,13 +14,13 @@ import ( . "github.com/puzpuzpuz/xsync/v3" ) -func TestQueue_InvalidSize(t *testing.T) { +func TestMPMCQueue_InvalidSize(t *testing.T) { defer func() { recover() }() NewMPMCQueue(0) t.Fatal("no panic detected") } -func TestQueueEnqueueDequeue(t *testing.T) { +func TestMPMCQueueEnqueueDequeue(t *testing.T) { q := NewMPMCQueue(10) for i := 0; i < 10; i++ { q.Enqueue(i) @@ -32,7 +32,7 @@ func TestQueueEnqueueDequeue(t *testing.T) { } } -func TestQueueEnqueueBlocksOnFull(t *testing.T) { +func TestMPMCQueueEnqueueBlocksOnFull(t *testing.T) { q := NewMPMCQueue(1) q.Enqueue("foo") cdone := make(chan bool) @@ -52,7 +52,7 @@ func TestQueueEnqueueBlocksOnFull(t *testing.T) { <-cdone } -func TestQueueDequeueBlocksOnEmpty(t *testing.T) { +func TestMPMCQueueDequeueBlocksOnEmpty(t *testing.T) { q := NewMPMCQueue(2) cdone := make(chan bool) flag := int32(0) @@ -69,7 +69,7 @@ func TestQueueDequeueBlocksOnEmpty(t *testing.T) { <-cdone } -func TestQueueTryEnqueueDequeue(t *testing.T) { +func TestMPMCQueueTryEnqueueDequeue(t *testing.T) { q := NewMPMCQueue(10) for i := 0; i < 10; i++ { if !q.TryEnqueue(i) { @@ -83,7 +83,7 @@ func TestQueueTryEnqueueDequeue(t *testing.T) { } } -func TestQueueTryEnqueueOnFull(t *testing.T) { +func TestMPMCQueueTryEnqueueOnFull(t *testing.T) { q := NewMPMCQueue(1) if !q.TryEnqueue("foo") { t.Error("failed to enqueue initial item") @@ -93,14 +93,14 @@ func TestQueueTryEnqueueOnFull(t *testing.T) { } } -func TestQueueTryDequeueBlocksOnEmpty(t *testing.T) { +func TestMPMCQueueTryDequeueOnEmpty(t *testing.T) { q := NewMPMCQueue(2) if _, ok := q.TryDequeue(); ok { t.Error("got success for enqueue on empty queue") } } -func hammerQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { +func hammerMPMCQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { runtime.GOMAXPROCS(gomaxprocs) q := NewMPMCQueue(numThreads) startwg := sync.WaitGroup{} @@ -142,21 +142,21 @@ func hammerQueueBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) } } -func TestQueueBlockingCalls(t *testing.T) { +func TestMPMCQueueBlockingCalls(t *testing.T) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) n := 100 if testing.Short() { n = 10 } - hammerQueueBlockingCalls(t, 1, 100*n, n) - hammerQueueBlockingCalls(t, 1, 1000*n, 10*n) - hammerQueueBlockingCalls(t, 4, 100*n, n) - hammerQueueBlockingCalls(t, 4, 1000*n, 10*n) - hammerQueueBlockingCalls(t, 8, 100*n, n) - hammerQueueBlockingCalls(t, 8, 1000*n, 10*n) + hammerMPMCQueueBlockingCalls(t, 1, 100*n, n) + hammerMPMCQueueBlockingCalls(t, 1, 1000*n, 10*n) + hammerMPMCQueueBlockingCalls(t, 4, 100*n, n) + hammerMPMCQueueBlockingCalls(t, 4, 1000*n, 10*n) + hammerMPMCQueueBlockingCalls(t, 8, 100*n, n) + hammerMPMCQueueBlockingCalls(t, 8, 1000*n, 10*n) } -func hammerQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { +func hammerMPMCQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { runtime.GOMAXPROCS(gomaxprocs) q := NewMPMCQueue(numThreads) startwg := sync.WaitGroup{} @@ -209,18 +209,18 @@ func hammerQueueNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads in } } -func TestQueueNonBlockingCalls(t *testing.T) { +func TestMPMCQueueNonBlockingCalls(t *testing.T) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) n := 10 if testing.Short() { n = 1 } - hammerQueueNonBlockingCalls(t, 1, n, n) - hammerQueueNonBlockingCalls(t, 2, 10*n, 2*n) - hammerQueueNonBlockingCalls(t, 4, 100*n, 4*n) + hammerMPMCQueueNonBlockingCalls(t, 1, n, n) + hammerMPMCQueueNonBlockingCalls(t, 2, 10*n, 2*n) + hammerMPMCQueueNonBlockingCalls(t, 4, 100*n, 4*n) } -func benchmarkQueueProdCons(b *testing.B, queueSize, localWork int) { +func benchmarkMPMCQueue(b *testing.B, queueSize, localWork int) { callsPerSched := queueSize procs := runtime.GOMAXPROCS(-1) / 2 if procs == 0 { @@ -265,15 +265,15 @@ func benchmarkQueueProdCons(b *testing.B, queueSize, localWork int) { } } -func BenchmarkQueueProdCons(b *testing.B) { - benchmarkQueueProdCons(b, 1000, 0) +func BenchmarkMPMCQueue(b *testing.B) { + benchmarkMPMCQueue(b, 1000, 0) } -func BenchmarkQueueProdConsWork100(b *testing.B) { - benchmarkQueueProdCons(b, 1000, 100) +func BenchmarkMPMCQueueWork100(b *testing.B) { + benchmarkMPMCQueue(b, 1000, 100) } -func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) { +func benchmarkMPMCChan(b *testing.B, chanSize, localWork int) { callsPerSched := chanSize procs := runtime.GOMAXPROCS(-1) / 2 if procs == 0 { @@ -318,10 +318,10 @@ func benchmarkChanProdCons(b *testing.B, chanSize, localWork int) { } } -func BenchmarkChanProdCons(b *testing.B) { - benchmarkChanProdCons(b, 1000, 0) +func BenchmarkMPMCChan(b *testing.B) { + benchmarkMPMCChan(b, 1000, 0) } -func BenchmarkChanProdConsWork100(b *testing.B) { - benchmarkChanProdCons(b, 1000, 100) +func BenchmarkMPMCChanWork100(b *testing.B) { + benchmarkMPMCChan(b, 1000, 100) } diff --git a/mpmcqueueof.go b/mpmcqueueof.go index 38a8fa3..3f7e4cc 100644 --- a/mpmcqueueof.go +++ b/mpmcqueueof.go @@ -12,7 +12,7 @@ import ( // A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent // queue. It's a generic version of MPMCQueue. // -// MPMCQueue instances must be created with NewMPMCQueueOf function. +// MPMCQueueOf instances must be created with NewMPMCQueueOf function. // A MPMCQueueOf must not be copied after first use. // // Based on the data structure from the following C++ library: @@ -61,6 +61,8 @@ func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { // Enqueue inserts the given item into the queue. // Blocks, if the queue is full. +// +// Deprecated: use TryEnqueue in combination with runtime.Gosched(). func (q *MPMCQueueOf[I]) Enqueue(item I) { head := atomic.AddUint64(&q.head, 1) - 1 slot := &q.slots[q.idx(head)] @@ -74,8 +76,10 @@ func (q *MPMCQueueOf[I]) Enqueue(item I) { // Dequeue retrieves and removes the item from the head of the queue. // Blocks, if the queue is empty. +// +// Deprecated: use TryDequeue in combination with runtime.Gosched(). func (q *MPMCQueueOf[I]) Dequeue() I { - var zeroedI I + var zeroI I tail := atomic.AddUint64(&q.tail, 1) - 1 slot := &q.slots[q.idx(tail)] turn := q.turn(tail)*2 + 1 @@ -83,7 +87,7 @@ func (q *MPMCQueueOf[I]) Dequeue() I { runtime.Gosched() } item := slot.item - slot.item = zeroedI + slot.item = zeroI slot.turn.Store(turn + 1) return item } @@ -93,24 +97,16 @@ func (q *MPMCQueueOf[I]) Dequeue() I { // full and the item was inserted. func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { head := atomic.LoadUint64(&q.head) - for { - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.head, head, head+1) { - slot.item = item - slot.turn.Store(turn + 1) - return true - } - } else { - prevHead := head - head = atomic.LoadUint64(&q.head) - if head == prevHead { - return false - } + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + slot.turn.Store(turn + 1) + return true } - runtime.Gosched() } + return false } // TryDequeue retrieves and removes the item from the head of the @@ -118,27 +114,19 @@ func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { // indicates that the queue isn't empty and an item was retrieved. func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { tail := atomic.LoadUint64(&q.tail) - for { - slot := &q.slots[q.idx(tail)] - turn := q.turn(tail)*2 + 1 - if slot.turn.Load() == turn { - if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { - var zeroedI I - item = slot.item - ok = true - slot.item = zeroedI - slot.turn.Store(turn + 1) - return - } - } else { - prevTail := tail - tail = atomic.LoadUint64(&q.tail) - if tail == prevTail { - return - } + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + var zeroI I + item = slot.item + ok = true + slot.item = zeroI + slot.turn.Store(turn + 1) + return } - runtime.Gosched() } + return } func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { diff --git a/mpmcqueueof_test.go b/mpmcqueueof_test.go index aed7480..b226f6f 100644 --- a/mpmcqueueof_test.go +++ b/mpmcqueueof_test.go @@ -18,13 +18,13 @@ import ( . "github.com/puzpuzpuz/xsync/v3" ) -func TestQueueOf_InvalidSize(t *testing.T) { +func TestMPMCQueueOf_InvalidSize(t *testing.T) { defer func() { recover() }() NewMPMCQueueOf[int](0) t.Fatal("no panic detected") } -func TestQueueOfEnqueueDequeueInt(t *testing.T) { +func TestMPMCQueueOfEnqueueDequeueInt(t *testing.T) { q := NewMPMCQueueOf[int](10) for i := 0; i < 10; i++ { q.Enqueue(i) @@ -36,7 +36,7 @@ func TestQueueOfEnqueueDequeueInt(t *testing.T) { } } -func TestQueueOfEnqueueDequeueString(t *testing.T) { +func TestMPMCQueueOfEnqueueDequeueString(t *testing.T) { q := NewMPMCQueueOf[string](10) for i := 0; i < 10; i++ { q.Enqueue(strconv.Itoa(i)) @@ -48,7 +48,7 @@ func TestQueueOfEnqueueDequeueString(t *testing.T) { } } -func TestQueueOfEnqueueDequeueStruct(t *testing.T) { +func TestMPMCQueueOfEnqueueDequeueStruct(t *testing.T) { type foo struct { bar int baz int @@ -64,7 +64,7 @@ func TestQueueOfEnqueueDequeueStruct(t *testing.T) { } } -func TestQueueOfEnqueueDequeueStructRef(t *testing.T) { +func TestMPMCQueueOfEnqueueDequeueStructRef(t *testing.T) { type foo struct { bar int baz int @@ -84,7 +84,7 @@ func TestQueueOfEnqueueDequeueStructRef(t *testing.T) { } } -func TestQueueOfEnqueueBlocksOnFull(t *testing.T) { +func TestMPMCQueueOfEnqueueBlocksOnFull(t *testing.T) { q := NewMPMCQueueOf[string](1) q.Enqueue("foo") cdone := make(chan bool) @@ -104,7 +104,7 @@ func TestQueueOfEnqueueBlocksOnFull(t *testing.T) { <-cdone } -func TestQueueOfDequeueBlocksOnEmpty(t *testing.T) { +func TestMPMCQueueOfDequeueBlocksOnEmpty(t *testing.T) { q := NewMPMCQueueOf[string](2) cdone := make(chan bool) flag := int32(0) @@ -121,7 +121,7 @@ func TestQueueOfDequeueBlocksOnEmpty(t *testing.T) { <-cdone } -func TestQueueOfTryEnqueueDequeue(t *testing.T) { +func TestMPMCQueueOfTryEnqueueDequeue(t *testing.T) { q := NewMPMCQueueOf[int](10) for i := 0; i < 10; i++ { if !q.TryEnqueue(i) { @@ -135,7 +135,7 @@ func TestQueueOfTryEnqueueDequeue(t *testing.T) { } } -func TestQueueOfTryEnqueueOnFull(t *testing.T) { +func TestMPMCQueueOfTryEnqueueOnFull(t *testing.T) { q := NewMPMCQueueOf[string](1) if !q.TryEnqueue("foo") { t.Error("failed to enqueue initial item") @@ -145,14 +145,14 @@ func TestQueueOfTryEnqueueOnFull(t *testing.T) { } } -func TestQueueOfTryDequeueBlocksOnEmpty(t *testing.T) { +func TestMPMCQueueOfTryDequeueOnEmpty(t *testing.T) { q := NewMPMCQueueOf[int](2) if _, ok := q.TryDequeue(); ok { t.Error("got success for enqueue on empty queue") } } -func hammerQueueOfBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { +func hammerMPMCQueueOfBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { runtime.GOMAXPROCS(gomaxprocs) q := NewMPMCQueueOf[int](numThreads) startwg := sync.WaitGroup{} @@ -194,21 +194,21 @@ func hammerQueueOfBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int } } -func TestQueueOfBlockingCalls(t *testing.T) { +func TestMPMCQueueOfBlockingCalls(t *testing.T) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) n := 100 if testing.Short() { n = 10 } - hammerQueueOfBlockingCalls(t, 1, 100*n, n) - hammerQueueOfBlockingCalls(t, 1, 1000*n, 10*n) - hammerQueueOfBlockingCalls(t, 4, 100*n, n) - hammerQueueOfBlockingCalls(t, 4, 1000*n, 10*n) - hammerQueueOfBlockingCalls(t, 8, 100*n, n) - hammerQueueOfBlockingCalls(t, 8, 1000*n, 10*n) + hammerMPMCQueueOfBlockingCalls(t, 1, 100*n, n) + hammerMPMCQueueOfBlockingCalls(t, 1, 1000*n, 10*n) + hammerMPMCQueueOfBlockingCalls(t, 4, 100*n, n) + hammerMPMCQueueOfBlockingCalls(t, 4, 1000*n, 10*n) + hammerMPMCQueueOfBlockingCalls(t, 8, 100*n, n) + hammerMPMCQueueOfBlockingCalls(t, 8, 1000*n, 10*n) } -func hammerQueueOfNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { +func hammerMPMCQueueOfNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { runtime.GOMAXPROCS(gomaxprocs) q := NewMPMCQueueOf[int](numThreads) startwg := sync.WaitGroup{} @@ -261,18 +261,18 @@ func hammerQueueOfNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads } } -func TestQueueOfNonBlockingCalls(t *testing.T) { +func TestMPMCQueueOfNonBlockingCalls(t *testing.T) { defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) n := 10 if testing.Short() { n = 1 } - hammerQueueOfNonBlockingCalls(t, 1, n, n) - hammerQueueOfNonBlockingCalls(t, 2, 10*n, 2*n) - hammerQueueOfNonBlockingCalls(t, 4, 100*n, 4*n) + hammerMPMCQueueOfNonBlockingCalls(t, 1, n, n) + hammerMPMCQueueOfNonBlockingCalls(t, 2, 10*n, 2*n) + hammerMPMCQueueOfNonBlockingCalls(t, 4, 100*n, 4*n) } -func benchmarkQueueOfProdCons(b *testing.B, queueSize, localWork int) { +func benchmarkMPMCQueueOf(b *testing.B, queueSize, localWork int) { callsPerSched := queueSize procs := runtime.GOMAXPROCS(-1) / 2 if procs == 0 { @@ -317,10 +317,10 @@ func benchmarkQueueOfProdCons(b *testing.B, queueSize, localWork int) { } } -func BenchmarkQueueOfProdCons(b *testing.B) { - benchmarkQueueOfProdCons(b, 1000, 0) +func BenchmarkMPMCQueueOf(b *testing.B) { + benchmarkMPMCQueueOf(b, 1000, 0) } -func BenchmarkOfQueueProdConsWork100(b *testing.B) { - benchmarkQueueOfProdCons(b, 1000, 100) +func BenchmarkMPMCQueueOfWork100(b *testing.B) { + benchmarkMPMCQueueOf(b, 1000, 100) } diff --git a/spscqueue.go b/spscqueue.go new file mode 100644 index 0000000..d370b22 --- /dev/null +++ b/spscqueue.go @@ -0,0 +1,92 @@ +package xsync + +import ( + "sync/atomic" +) + +// A SPSCQueue is a bounded single-producer single-consumer concurrent +// queue. This means that not more than a single goroutine must be +// publishing items to the queue while not more than a single goroutine +// must be consuming those items. +// +// SPSCQueue instances must be created with NewSPSCQueue function. +// A SPSCQueue must not be copied after first use. +// +// Based on the data structure from the following article: +// https://rigtorp.se/ringbuffer/ +type SPSCQueue struct { + cap uint64 + p_idx uint64 + //lint:ignore U1000 prevents false sharing + pad0 [cacheLineSize - 8]byte + p_cached_idx uint64 + //lint:ignore U1000 prevents false sharing + pad1 [cacheLineSize - 8]byte + c_idx uint64 + //lint:ignore U1000 prevents false sharing + pad2 [cacheLineSize - 8]byte + c_cached_idx uint64 + //lint:ignore U1000 prevents false sharing + pad3 [cacheLineSize - 8]byte + items []interface{} +} + +// NewSPSCQueue creates a new SPSCQueue instance with the given +// capacity. +func NewSPSCQueue(capacity int) *SPSCQueue { + if capacity < 1 { + panic("capacity must be positive number") + } + return &SPSCQueue{ + cap: uint64(capacity + 1), + items: make([]interface{}, capacity+1), + } +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *SPSCQueue) TryEnqueue(item interface{}) bool { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.p_idx) + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + cached_idx := q.c_cached_idx + if next_idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.c_idx) + q.c_cached_idx = cached_idx + if next_idx == cached_idx { + return false + } + } + q.items[idx] = item + atomic.StoreUint64(&q.p_idx, next_idx) + return true +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *SPSCQueue) TryDequeue() (item interface{}, ok bool) { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.c_idx) + cached_idx := q.p_cached_idx + if idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.p_idx) + q.p_cached_idx = cached_idx + if idx == cached_idx { + return + } + } + item = q.items[idx] + q.items[idx] = nil + ok = true + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + atomic.StoreUint64(&q.c_idx, next_idx) + return +} diff --git a/spscqueue_test.go b/spscqueue_test.go new file mode 100644 index 0000000..cdc67f4 --- /dev/null +++ b/spscqueue_test.go @@ -0,0 +1,103 @@ +// Copyright notice. The following tests are partially based on +// the following file from the Go Programming Language core repo: +// https://github.com/golang/go/blob/831f9376d8d730b16fb33dfd775618dffe13ce7a/src/runtime/chan_test.go + +package xsync_test + +import ( + "sync" + "testing" + + . "github.com/puzpuzpuz/xsync/v3" +) + +func TestSPSCQueue_InvalidSize(t *testing.T) { + defer func() { recover() }() + NewSPSCQueue(0) + t.Fatal("no panic detected") +} + +func TestSPSCQueueTryEnqueueDequeue(t *testing.T) { + q := NewSPSCQueue(10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(i) { + t.Fatal("TryEnqueue failed") + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != i { + t.Fatalf("%v: got %v, want %d", ok, got, i) + } + } +} + +func TestSPSCQueueTryEnqueueOnFull(t *testing.T) { + q := NewSPSCQueue(1) + if !q.TryEnqueue("foo") { + t.Error("failed to enqueue initial item") + } + if q.TryEnqueue("bar") { + t.Error("got success for enqueue on full queue") + } +} + +func TestSPSCQueueTryDequeueOnEmpty(t *testing.T) { + q := NewSPSCQueue(2) + if _, ok := q.TryDequeue(); ok { + t.Error("got success for enqueue on empty queue") + } +} + +func hammerSPSCQueueNonBlockingCalls(t *testing.T, cap, numOps int) { + q := NewSPSCQueue(cap) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, 2) + // Start producer. + go func() { + startwg.Wait() + for j := 0; j < numOps; j++ { + for !q.TryEnqueue(j) { + // busy spin until success + } + } + }() + // Start consumer. + go func() { + startwg.Wait() + sum := 0 + for j := 0; j < numOps; j++ { + var ( + item interface{} + ok bool + ) + for { + // busy spin until success + if item, ok = q.TryDequeue(); ok { + sum += item.(int) + break + } + } + } + csum <- sum + }() + startwg.Done() + // Wait for all the sum from the producer. + sum := <-csum + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops: got %d, want %d", + numOps, sum, expectedSum) + } +} + +func TestSPSCQueueNonBlockingCalls(t *testing.T) { + n := 10 + if testing.Short() { + n = 1 + } + hammerSPSCQueueNonBlockingCalls(t, 1, n) + hammerSPSCQueueNonBlockingCalls(t, 2, 2*n) + hammerSPSCQueueNonBlockingCalls(t, 4, 4*n) +} diff --git a/spscqueueof.go b/spscqueueof.go new file mode 100644 index 0000000..cf3a13b --- /dev/null +++ b/spscqueueof.go @@ -0,0 +1,96 @@ +//go:build go1.19 +// +build go1.19 + +package xsync + +import ( + "sync/atomic" +) + +// A SPSCQueueOf is a bounded single-producer single-consumer concurrent +// queue. This means that not more than a single goroutine must be +// publishing items to the queue while not more than a single goroutine +// must be consuming those items. +// +// SPSCQueueOf instances must be created with NewSPSCQueueOf function. +// A SPSCQueueOf must not be copied after first use. +// +// Based on the data structure from the following article: +// https://rigtorp.se/ringbuffer/ +type SPSCQueueOf[I any] struct { + cap uint64 + p_idx uint64 + //lint:ignore U1000 prevents false sharing + pad0 [cacheLineSize - 8]byte + p_cached_idx uint64 + //lint:ignore U1000 prevents false sharing + pad1 [cacheLineSize - 8]byte + c_idx uint64 + //lint:ignore U1000 prevents false sharing + pad2 [cacheLineSize - 8]byte + c_cached_idx uint64 + //lint:ignore U1000 prevents false sharing + pad3 [cacheLineSize - 8]byte + items []I +} + +// NewSPSCQueueOf creates a new SPSCQueueOf instance with the given +// capacity. +func NewSPSCQueueOf[I any](capacity int) *SPSCQueueOf[I] { + if capacity < 1 { + panic("capacity must be positive number") + } + return &SPSCQueueOf[I]{ + cap: uint64(capacity + 1), + items: make([]I, capacity+1), + } +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *SPSCQueueOf[I]) TryEnqueue(item I) bool { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.p_idx) + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + cached_idx := q.c_cached_idx + if next_idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.c_idx) + q.c_cached_idx = cached_idx + if next_idx == cached_idx { + return false + } + } + q.items[idx] = item + atomic.StoreUint64(&q.p_idx, next_idx) + return true +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *SPSCQueueOf[I]) TryDequeue() (item I, ok bool) { + // relaxed memory order would be enough here + idx := atomic.LoadUint64(&q.c_idx) + cached_idx := q.p_cached_idx + if idx == cached_idx { + cached_idx = atomic.LoadUint64(&q.p_idx) + q.p_cached_idx = cached_idx + if idx == cached_idx { + return + } + } + var zeroI I + item = q.items[idx] + q.items[idx] = zeroI + ok = true + next_idx := idx + 1 + if next_idx == q.cap { + next_idx = 0 + } + atomic.StoreUint64(&q.c_idx, next_idx) + return +} diff --git a/spscqueueof_test.go b/spscqueueof_test.go new file mode 100644 index 0000000..1a8432d --- /dev/null +++ b/spscqueueof_test.go @@ -0,0 +1,281 @@ +//go:build go1.19 +// +build go1.19 + +// Copyright notice. The following tests are partially based on +// the following file from the Go Programming Language core repo: +// https://github.com/golang/go/blob/831f9376d8d730b16fb33dfd775618dffe13ce7a/src/runtime/chan_test.go + +package xsync_test + +import ( + "runtime" + "strconv" + "sync" + "sync/atomic" + "testing" + + . "github.com/puzpuzpuz/xsync/v3" +) + +func TestSPSCQueueOf_InvalidSize(t *testing.T) { + defer func() { recover() }() + NewSPSCQueueOf[int](0) + t.Fatal("no panic detected") +} + +func TestSPSCQueueOfTryEnqueueDequeueInt(t *testing.T) { + q := NewSPSCQueueOf[int](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(i) { + t.Fatal("TryEnqueue failed") + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != i { + t.Fatalf("%v: got %v, want %d", ok, got, i) + } + } +} + +func TestSPSCQueueOfTryEnqueueDequeueString(t *testing.T) { + q := NewSPSCQueueOf[string](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(strconv.Itoa(i)) { + t.Fatal("TryEnqueue failed") + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != strconv.Itoa(i) { + t.Fatalf("%v: got %v, want %d", ok, got, i) + } + } +} + +func TestSPSCQueueOfTryEnqueueDequeueStruct(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewSPSCQueueOf[foo](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(foo{i, i}) { + t.Fatal("TryEnqueue failed") + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got.bar != i || got.baz != i { + t.Fatalf("%v: got %v, want %d", ok, got, i) + } + } +} + +func TestSPSCQueueOfTryEnqueueDequeueStructRef(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewSPSCQueueOf[*foo](11) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(&foo{i, i}) { + t.Fatal("TryEnqueue failed") + } + } + if !q.TryEnqueue(nil) { + t.Fatal("TryEnqueue with nil failed") + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got.bar != i || got.baz != i { + t.Fatalf("%v: got %v, want %d", ok, got, i) + } + } + if last, ok := q.TryDequeue(); !ok || last != nil { + t.Fatalf("%v: got %v, want nil", ok, last) + } +} + +func TestSPSCQueueOfTryEnqueueDequeue(t *testing.T) { + q := NewSPSCQueueOf[int](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(i) { + t.Fatalf("failed to enqueue for %d", i) + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != i { + t.Fatalf("got %v, want %d, for status %v", got, i, ok) + } + } +} + +func TestSPSCQueueOfTryEnqueueOnFull(t *testing.T) { + q := NewSPSCQueueOf[string](1) + if !q.TryEnqueue("foo") { + t.Error("failed to enqueue initial item") + } + if q.TryEnqueue("bar") { + t.Error("got success for enqueue on full queue") + } +} + +func TestSPSCQueueOfTryDequeueOnEmpty(t *testing.T) { + q := NewSPSCQueueOf[int](2) + if _, ok := q.TryDequeue(); ok { + t.Error("got success for enqueue on empty queue") + } +} + +func hammerSPSCQueueOfNonBlockingCalls(t *testing.T, cap, numOps int) { + q := NewSPSCQueueOf[int](cap) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, 2) + // Start producer. + go func() { + startwg.Wait() + for j := 0; j < numOps; j++ { + for !q.TryEnqueue(j) { + // busy spin until success + } + } + }() + // Start consumer. + go func() { + startwg.Wait() + sum := 0 + for j := 0; j < numOps; j++ { + var ( + item int + ok bool + ) + for { + // busy spin until success + if item, ok = q.TryDequeue(); ok { + sum += item + break + } + } + } + csum <- sum + }() + startwg.Done() + // Wait for all the sum from the producer. + sum := <-csum + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops: got %d, want %d", + numOps, sum, expectedSum) + } +} + +func TestSPSCQueueOfNonBlockingCalls(t *testing.T) { + n := 10 + if testing.Short() { + n = 1 + } + hammerSPSCQueueOfNonBlockingCalls(t, 1, n) + hammerSPSCQueueOfNonBlockingCalls(t, 2, 2*n) + hammerSPSCQueueOfNonBlockingCalls(t, 4, 4*n) +} + +func benchmarkSPSCQueueOfProdCons(b *testing.B, queueSize, localWork int) { + callsPerSched := queueSize + N := int32(b.N / callsPerSched) + c := make(chan bool, 2) + q := NewSPSCQueueOf[int](queueSize) + + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < callsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + if !q.TryEnqueue(1) { + runtime.Gosched() + } + } + } + q.TryEnqueue(0) + c <- foo == 42 + }() + + go func() { + foo := 0 + for { + v, ok := q.TryDequeue() + if ok { + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } else { + runtime.Gosched() + } + } + c <- foo == 42 + }() + + <-c + <-c +} + +func BenchmarkSPSCQueueOfProdCons(b *testing.B) { + benchmarkSPSCQueueOfProdCons(b, 1000, 0) +} + +func BenchmarkSPSCQueueOfProdConsWork100(b *testing.B) { + benchmarkSPSCQueueOfProdCons(b, 1000, 100) +} + +func benchmarkSPSCChan(b *testing.B, chanSize, localWork int) { + callsPerSched := chanSize + N := int32(b.N / callsPerSched) + c := make(chan bool, 2) + myc := make(chan int, chanSize) + + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < callsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + myc <- 1 + } + } + myc <- 0 + c <- foo == 42 + }() + + go func() { + foo := 0 + for { + v := <-myc + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 + }() + + <-c + <-c +} + +func BenchmarkSPSCChan(b *testing.B) { + benchmarkSPSCChan(b, 1000, 0) +} + +func BenchmarkSPSCChanWork100(b *testing.B) { + benchmarkSPSCChan(b, 1000, 100) +}