From ace0f0f74f9efcf6f83a907113446b7497eab72d Mon Sep 17 00:00:00 2001 From: Andrei Pechkurov <37772591+puzpuzpuz@users.noreply.github.com> Date: Mon, 21 Aug 2023 11:02:15 +0300 Subject: [PATCH] Add concurrent queue with generics support (MPMCQueueOf) (#104) --- README.md | 12 +- mpmcqueueof.go | 150 ++++++++++++++++++++ mpmcqueueof_test.go | 326 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 486 insertions(+), 2 deletions(-) create mode 100644 mpmcqueueof.go create mode 100644 mpmcqueueof_test.go diff --git a/README.md b/README.md index 54f5fdb..94dca04 100644 --- a/README.md +++ b/README.md @@ -99,12 +99,20 @@ q.Enqueue("foo") // optimistic insertion attempt; doesn't block inserted := q.TryEnqueue("bar") // consumer obtains an item from the queue -item := q.Dequeue() +item := q.Dequeue() // interface{} pointing to a string // optimistic obtain attempt; doesn't block item, ok := q.TryDequeue() ``` -Based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. +`MPMCQueueOf[I]` is an implementation with parametrized item type. It is available for Go 1.19 or later. + +```go +q := xsync.NewMPMCQueueOf[string](1024) +q.Enqueue("foo") +item := q.Dequeue() // string +``` + +The queue is based on the algorithm from the [MPMCQueue](https://github.com/rigtorp/MPMCQueue) C++ library which in its turn references D.Vyukov's [MPMC queue](https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue). According to the following [classification](https://www.1024cores.net/home/lock-free-algorithms/queues), the queue is array-based, fails on overflow, provides causal FIFO, has blocking producers and consumers. The idea of the algorithm is to allow parallelism for concurrent producers and consumers by introducing the notion of tickets, i.e. values of two counters, one per producers/consumers. An atomic increment of one of those counters is the only noticeable contention point in queue operations. The rest of the operation avoids contention on writes thanks to the turn-based read/write access for each of the queue items. diff --git a/mpmcqueueof.go b/mpmcqueueof.go new file mode 100644 index 0000000..38a8fa3 --- /dev/null +++ b/mpmcqueueof.go @@ -0,0 +1,150 @@ +//go:build go1.19 +// +build go1.19 + +package xsync + +import ( + "runtime" + "sync/atomic" + "unsafe" +) + +// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent +// queue. It's a generic version of MPMCQueue. +// +// MPMCQueue instances must be created with NewMPMCQueueOf function. +// A MPMCQueueOf must not be copied after first use. +// +// Based on the data structure from the following C++ library: +// https://github.com/rigtorp/MPMCQueue +type MPMCQueueOf[I any] struct { + cap uint64 + head uint64 + //lint:ignore U1000 prevents false sharing + hpad [cacheLineSize - 8]byte + tail uint64 + //lint:ignore U1000 prevents false sharing + tpad [cacheLineSize - 8]byte + slots []slotOfPadded[I] +} + +type slotOfPadded[I any] struct { + slotOf[I] + // Unfortunately, proper padding like the below one: + // + // pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte + // + // won't compile, so here we add a best-effort padding for items up to + // 56 bytes size. + //lint:ignore U1000 prevents false sharing + pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte +} + +type slotOf[I any] struct { + // atomic.Uint64 is used here to get proper 8 byte alignment on + // 32-bit archs. + turn atomic.Uint64 + item I +} + +// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given +// capacity. +func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { + if capacity < 1 { + panic("capacity must be positive number") + } + return &MPMCQueueOf[I]{ + cap: uint64(capacity), + slots: make([]slotOfPadded[I], capacity), + } +} + +// Enqueue inserts the given item into the queue. +// Blocks, if the queue is full. +func (q *MPMCQueueOf[I]) Enqueue(item I) { + head := atomic.AddUint64(&q.head, 1) - 1 + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + for slot.turn.Load() != turn { + runtime.Gosched() + } + slot.item = item + slot.turn.Store(turn + 1) +} + +// Dequeue retrieves and removes the item from the head of the queue. +// Blocks, if the queue is empty. +func (q *MPMCQueueOf[I]) Dequeue() I { + var zeroedI I + tail := atomic.AddUint64(&q.tail, 1) - 1 + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + for slot.turn.Load() != turn { + runtime.Gosched() + } + item := slot.item + slot.item = zeroedI + slot.turn.Store(turn + 1) + return item +} + +// TryEnqueue inserts the given item into the queue. Does not block +// and returns immediately. The result indicates that the queue isn't +// full and the item was inserted. +func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { + head := atomic.LoadUint64(&q.head) + for { + slot := &q.slots[q.idx(head)] + turn := q.turn(head) * 2 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.head, head, head+1) { + slot.item = item + slot.turn.Store(turn + 1) + return true + } + } else { + prevHead := head + head = atomic.LoadUint64(&q.head) + if head == prevHead { + return false + } + } + runtime.Gosched() + } +} + +// TryDequeue retrieves and removes the item from the head of the +// queue. Does not block and returns immediately. The ok result +// indicates that the queue isn't empty and an item was retrieved. +func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { + tail := atomic.LoadUint64(&q.tail) + for { + slot := &q.slots[q.idx(tail)] + turn := q.turn(tail)*2 + 1 + if slot.turn.Load() == turn { + if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { + var zeroedI I + item = slot.item + ok = true + slot.item = zeroedI + slot.turn.Store(turn + 1) + return + } + } else { + prevTail := tail + tail = atomic.LoadUint64(&q.tail) + if tail == prevTail { + return + } + } + runtime.Gosched() + } +} + +func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { + return i % q.cap +} + +func (q *MPMCQueueOf[I]) turn(i uint64) uint64 { + return i / q.cap +} diff --git a/mpmcqueueof_test.go b/mpmcqueueof_test.go new file mode 100644 index 0000000..7f4e161 --- /dev/null +++ b/mpmcqueueof_test.go @@ -0,0 +1,326 @@ +//go:build go1.19 +// +build go1.19 + +// Copyright notice. The following tests are partially based on +// the following file from the Go Programming Language core repo: +// https://github.com/golang/go/blob/831f9376d8d730b16fb33dfd775618dffe13ce7a/src/runtime/chan_test.go + +package xsync_test + +import ( + "runtime" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + . "github.com/puzpuzpuz/xsync/v2" +) + +func TestQueueOf_InvalidSize(t *testing.T) { + defer func() { recover() }() + NewMPMCQueueOf[int](0) + t.Fatal("no panic detected") +} + +func TestQueueOfEnqueueDequeueInt(t *testing.T) { + q := NewMPMCQueueOf[int](10) + for i := 0; i < 10; i++ { + q.Enqueue(i) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got != i { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueString(t *testing.T) { + q := NewMPMCQueueOf[string](10) + for i := 0; i < 10; i++ { + q.Enqueue(strconv.Itoa(i)) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got != strconv.Itoa(i) { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueStruct(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewMPMCQueueOf[foo](10) + for i := 0; i < 10; i++ { + q.Enqueue(foo{i, i}) + } + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got.bar != i || got.baz != i { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestQueueOfEnqueueDequeueStructRef(t *testing.T) { + type foo struct { + bar int + baz int + } + q := NewMPMCQueueOf[*foo](11) + for i := 0; i < 10; i++ { + q.Enqueue(&foo{i, i}) + } + q.Enqueue(nil) + for i := 0; i < 10; i++ { + if got := q.Dequeue(); got.bar != i || got.baz != i { + t.Fatalf("got %v, want %d", got, i) + } + } + if last := q.Dequeue(); last != nil { + t.Fatalf("got %v, want nil", last) + } +} + +func TestQueueOfEnqueueBlocksOnFull(t *testing.T) { + q := NewMPMCQueueOf[string](1) + q.Enqueue("foo") + cdone := make(chan bool) + flag := int32(0) + go func() { + q.Enqueue("bar") + if atomic.LoadInt32(&flag) == 0 { + t.Error("enqueue on full queue didn't wait for dequeue") + } + cdone <- true + }() + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + if got := q.Dequeue(); got != "foo" { + t.Fatalf("got %v, want foo", got) + } + <-cdone +} + +func TestQueueOfDequeueBlocksOnEmpty(t *testing.T) { + q := NewMPMCQueueOf[string](2) + cdone := make(chan bool) + flag := int32(0) + go func() { + q.Dequeue() + if atomic.LoadInt32(&flag) == 0 { + t.Error("dequeue on empty queue didn't wait for enqueue") + } + cdone <- true + }() + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + q.Enqueue("foobar") + <-cdone +} + +func TestQueueOfTryEnqueueDequeue(t *testing.T) { + q := NewMPMCQueueOf[int](10) + for i := 0; i < 10; i++ { + if !q.TryEnqueue(i) { + t.Fatalf("failed to enqueue for %d", i) + } + } + for i := 0; i < 10; i++ { + if got, ok := q.TryDequeue(); !ok || got != i { + t.Fatalf("got %v, want %d, for status %v", got, i, ok) + } + } +} + +func TestQueueOfTryEnqueueOnFull(t *testing.T) { + q := NewMPMCQueueOf[string](1) + if !q.TryEnqueue("foo") { + t.Error("failed to enqueue initial item") + } + if q.TryEnqueue("bar") { + t.Error("got success for enqueue on full queue") + } +} + +func TestQueueOfTryDequeueBlocksOnEmpty(t *testing.T) { + q := NewMPMCQueueOf[int](2) + if _, ok := q.TryDequeue(); ok { + t.Error("got success for enqueue on empty queue") + } +} + +func hammerQueueOfBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { + runtime.GOMAXPROCS(gomaxprocs) + q := NewMPMCQueueOf[int](numThreads) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, numThreads) + // Start producers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + for j := n; j < numOps; j += numThreads { + q.Enqueue(j) + } + }(i) + } + // Start consumers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + sum := 0 + for j := n; j < numOps; j += numThreads { + item := q.Dequeue() + sum += item + } + csum <- sum + }(i) + } + startwg.Done() + // Wait for all the sums from producers. + sum := 0 + for i := 0; i < numThreads; i++ { + s := <-csum + sum += s + } + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops, %d num threads: got %d, want %d", + numOps, numThreads, sum, expectedSum) + } +} + +func TestQueueOfBlockingCalls(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + n := 100 + if testing.Short() { + n = 10 + } + hammerQueueOfBlockingCalls(t, 1, 100*n, n) + hammerQueueOfBlockingCalls(t, 1, 1000*n, 10*n) + hammerQueueOfBlockingCalls(t, 4, 100*n, n) + hammerQueueOfBlockingCalls(t, 4, 1000*n, 10*n) + hammerQueueOfBlockingCalls(t, 8, 100*n, n) + hammerQueueOfBlockingCalls(t, 8, 1000*n, 10*n) +} + +func hammerQueueOfNonBlockingCalls(t *testing.T, gomaxprocs, numOps, numThreads int) { + runtime.GOMAXPROCS(gomaxprocs) + q := NewMPMCQueueOf[int](numThreads) + startwg := sync.WaitGroup{} + startwg.Add(1) + csum := make(chan int, numThreads) + // Start producers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + for j := n; j < numOps; j += numThreads { + for !q.TryEnqueue(j) { + // busy spin until success + } + } + }(i) + } + // Start consumers. + for i := 0; i < numThreads; i++ { + go func(n int) { + startwg.Wait() + sum := 0 + for j := n; j < numOps; j += numThreads { + var ( + item int + ok bool + ) + for { + // busy spin until success + if item, ok = q.TryDequeue(); ok { + sum += item + break + } + } + } + csum <- sum + }(i) + } + startwg.Done() + // Wait for all the sums from producers. + sum := 0 + for i := 0; i < numThreads; i++ { + s := <-csum + sum += s + } + // Assert the total sum. + expectedSum := numOps * (numOps - 1) / 2 + if sum != expectedSum { + t.Fatalf("sums don't match for %d num ops, %d num threads: got %d, want %d", + numOps, numThreads, sum, expectedSum) + } +} + +func TestQueueOfNonBlockingCalls(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + n := 10 + if testing.Short() { + n = 1 + } + hammerQueueOfNonBlockingCalls(t, 1, n, n) + hammerQueueOfNonBlockingCalls(t, 2, 10*n, 2*n) + hammerQueueOfNonBlockingCalls(t, 4, 100*n, 4*n) +} + +func benchmarkQueueOfProdCons(b *testing.B, queueSize, localWork int) { + callsPerSched := queueSize + procs := runtime.GOMAXPROCS(-1) / 2 + if procs == 0 { + procs = 1 + } + N := int32(b.N / callsPerSched) + c := make(chan bool, 2*procs) + q := NewMPMCQueueOf[int](queueSize) + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < callsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + q.Enqueue(1) + } + } + q.Enqueue(0) + c <- foo == 42 + }() + go func() { + foo := 0 + for { + v := q.Dequeue() + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + <-c + } +} + +func BenchmarkQueueOfProdCons(b *testing.B) { + benchmarkQueueOfProdCons(b, 1000, 0) +} + +func BenchmarkOfQueueProdConsWork100(b *testing.B) { + benchmarkQueueOfProdCons(b, 1000, 100) +}