-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add concurrent queue with generics support (MPMCQueueOf) (#104)
- Loading branch information
Showing
3 changed files
with
486 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
//go:build go1.19 | ||
// +build go1.19 | ||
|
||
package xsync | ||
|
||
import ( | ||
"runtime" | ||
"sync/atomic" | ||
"unsafe" | ||
) | ||
|
||
// A MPMCQueueOf is a bounded multi-producer multi-consumer concurrent | ||
// queue. It's a generic version of MPMCQueue. | ||
// | ||
// MPMCQueue instances must be created with NewMPMCQueueOf function. | ||
// A MPMCQueueOf must not be copied after first use. | ||
// | ||
// Based on the data structure from the following C++ library: | ||
// https://github.com/rigtorp/MPMCQueue | ||
type MPMCQueueOf[I any] struct { | ||
cap uint64 | ||
head uint64 | ||
//lint:ignore U1000 prevents false sharing | ||
hpad [cacheLineSize - 8]byte | ||
tail uint64 | ||
//lint:ignore U1000 prevents false sharing | ||
tpad [cacheLineSize - 8]byte | ||
slots []slotOfPadded[I] | ||
} | ||
|
||
type slotOfPadded[I any] struct { | ||
slotOf[I] | ||
// Unfortunately, proper padding like the below one: | ||
// | ||
// pad [cacheLineSize - (unsafe.Sizeof(slotOf[I]{}) % cacheLineSize)]byte | ||
// | ||
// won't compile, so here we add a best-effort padding for items up to | ||
// 56 bytes size. | ||
//lint:ignore U1000 prevents false sharing | ||
pad [cacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte | ||
} | ||
|
||
type slotOf[I any] struct { | ||
// atomic.Uint64 is used here to get proper 8 byte alignment on | ||
// 32-bit archs. | ||
turn atomic.Uint64 | ||
item I | ||
} | ||
|
||
// NewMPMCQueueOf creates a new MPMCQueueOf instance with the given | ||
// capacity. | ||
func NewMPMCQueueOf[I any](capacity int) *MPMCQueueOf[I] { | ||
if capacity < 1 { | ||
panic("capacity must be positive number") | ||
} | ||
return &MPMCQueueOf[I]{ | ||
cap: uint64(capacity), | ||
slots: make([]slotOfPadded[I], capacity), | ||
} | ||
} | ||
|
||
// Enqueue inserts the given item into the queue. | ||
// Blocks, if the queue is full. | ||
func (q *MPMCQueueOf[I]) Enqueue(item I) { | ||
head := atomic.AddUint64(&q.head, 1) - 1 | ||
slot := &q.slots[q.idx(head)] | ||
turn := q.turn(head) * 2 | ||
for slot.turn.Load() != turn { | ||
runtime.Gosched() | ||
} | ||
slot.item = item | ||
slot.turn.Store(turn + 1) | ||
} | ||
|
||
// Dequeue retrieves and removes the item from the head of the queue. | ||
// Blocks, if the queue is empty. | ||
func (q *MPMCQueueOf[I]) Dequeue() I { | ||
var zeroedI I | ||
tail := atomic.AddUint64(&q.tail, 1) - 1 | ||
slot := &q.slots[q.idx(tail)] | ||
turn := q.turn(tail)*2 + 1 | ||
for slot.turn.Load() != turn { | ||
runtime.Gosched() | ||
} | ||
item := slot.item | ||
slot.item = zeroedI | ||
slot.turn.Store(turn + 1) | ||
return item | ||
} | ||
|
||
// TryEnqueue inserts the given item into the queue. Does not block | ||
// and returns immediately. The result indicates that the queue isn't | ||
// full and the item was inserted. | ||
func (q *MPMCQueueOf[I]) TryEnqueue(item I) bool { | ||
head := atomic.LoadUint64(&q.head) | ||
for { | ||
slot := &q.slots[q.idx(head)] | ||
turn := q.turn(head) * 2 | ||
if slot.turn.Load() == turn { | ||
if atomic.CompareAndSwapUint64(&q.head, head, head+1) { | ||
slot.item = item | ||
slot.turn.Store(turn + 1) | ||
return true | ||
} | ||
} else { | ||
prevHead := head | ||
head = atomic.LoadUint64(&q.head) | ||
if head == prevHead { | ||
return false | ||
} | ||
} | ||
runtime.Gosched() | ||
} | ||
} | ||
|
||
// TryDequeue retrieves and removes the item from the head of the | ||
// queue. Does not block and returns immediately. The ok result | ||
// indicates that the queue isn't empty and an item was retrieved. | ||
func (q *MPMCQueueOf[I]) TryDequeue() (item I, ok bool) { | ||
tail := atomic.LoadUint64(&q.tail) | ||
for { | ||
slot := &q.slots[q.idx(tail)] | ||
turn := q.turn(tail)*2 + 1 | ||
if slot.turn.Load() == turn { | ||
if atomic.CompareAndSwapUint64(&q.tail, tail, tail+1) { | ||
var zeroedI I | ||
item = slot.item | ||
ok = true | ||
slot.item = zeroedI | ||
slot.turn.Store(turn + 1) | ||
return | ||
} | ||
} else { | ||
prevTail := tail | ||
tail = atomic.LoadUint64(&q.tail) | ||
if tail == prevTail { | ||
return | ||
} | ||
} | ||
runtime.Gosched() | ||
} | ||
} | ||
|
||
func (q *MPMCQueueOf[I]) idx(i uint64) uint64 { | ||
return i % q.cap | ||
} | ||
|
||
func (q *MPMCQueueOf[I]) turn(i uint64) uint64 { | ||
return i / q.cap | ||
} |
Oops, something went wrong.