Skip to content

Commit

Permalink
Fix optimistic RBMutex methods and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz committed Jul 14, 2024
1 parent f63979d commit 0cba9ad
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 38 deletions.
76 changes: 44 additions & 32 deletions rbmutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,34 @@ func NewRBMutex() *RBMutex {
return &mu
}

// TryRLock tries to retrieve a reader lock token via
// the fast path or gives up returning nil.
// TryRLock tries to lock m for reading without blocking.
// When TryRLock succeeds, it returns a reader token.
// In case of a failure, a nil is returned.
func (mu *RBMutex) TryRLock() *RToken {
if _, mux := mu.fastTryRlock(); mux != nil {
if mux := mu.fastRlock(); mux != nil {
return mux
}
return nil
}

// TryLock tries to acquire a write lock without blocking,
// exposing the underlying TryLock method of sync.RWMutex.
func (mu *RBMutex) TryLock() bool {
return mu.rw.TryLock()
// RLock locks m for reading and returns a reader token. The
// token must be used in the later RUnlock call.
//
// Should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock.
func (mu *RBMutex) RLock() *RToken {
if r := mu.fastRlock(); r != nil {
return r
}
// Slow path.
mu.rw.RLock()
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return nil
}

func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) {
func (mu *RBMutex) fastRlock() *RToken {
if atomic.LoadInt32(&mu.rbias) == 1 {
t, ok := rtokenPool.Get().(*RToken)
if !ok {
Expand All @@ -95,36 +107,16 @@ func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) {
if atomic.LoadInt32(&mu.rbias) == 1 {
// Hot path succeeded.
t.slot = slot
return rslot, t
return t
}
// The mutex is no longer reader biased. Roll back.
atomic.AddInt32(&rslot.mu, -1)
rtokenPool.Put(t)
return rslot, nil
return nil
}
// Contention detected. Give a try with the next slot.
}
}
return nil, nil
}

// RLock locks m for reading and returns a reader token. The
// token must be used in the later RUnlock call.
//
// Should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock.
func (mu *RBMutex) RLock() *RToken {
rslot, r := mu.fastTryRlock()
if r != nil {
return r
}
if rslot != nil {
// The mutex is no longer reader biased. Go to the slow path.
atomic.AddInt32(&rslot.mu, -1)
}
// Slow path.
mu.rw.RLock()
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return nil
}

Expand All @@ -143,6 +135,26 @@ func (mu *RBMutex) RUnlock(t *RToken) {
rtokenPool.Put(t)
}

// TryLock tries to lock m for writing without blocking.
func (mu *RBMutex) TryLock() bool {
if mu.rw.TryLock() {
if atomic.LoadInt32(&mu.rbias) == 1 {
// Some readers may have acquired the lock. Let's check.
atomic.StoreInt32(&mu.rbias, 0)
mu.inhibitUntil = time.Now()
for i := 0; i < len(mu.rslots); i++ {
if atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
// There is a reader. Roll back.
mu.rw.Unlock()
return false
}
}
}
return true
}
return false
}

// Lock locks m for writing. If the lock is already locked for
// reading or writing, Lock blocks until the lock is available.
func (mu *RBMutex) Lock() {
Expand Down
110 changes: 104 additions & 6 deletions rbmutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,43 @@ import (
)

func TestRBMutexSerialReader(t *testing.T) {
const numIters = 10
const numCalls = 10
mu := NewRBMutex()
var rtokens [numIters]*RToken
for i := 0; i < numIters; i++ {
rtokens[i] = mu.RLock()
for i := 0; i < 3; i++ {
var rtokens [numCalls]*RToken
for j := 0; j < numCalls; j++ {
rtokens[j] = mu.RLock()
}
for j := 0; j < numCalls; j++ {
mu.RUnlock(rtokens[j])
}
}
}

func TestRBMutexSerialOptimisticReader(t *testing.T) {
const numCalls = 10
mu := NewRBMutex()
for i := 0; i < 3; i++ {
var rtokens [numCalls]*RToken
for j := 0; j < numCalls; j++ {
rtokens[j] = mu.TryRLock()
if rtokens[j] == nil {
t.Fatalf("TryRLock failed for %d", j)
}
}
for j := 0; j < numCalls; j++ {
mu.RUnlock(rtokens[j])
}
}
for i := 0; i < numIters; i++ {
mu.RUnlock(rtokens[i])
}

func TestRBMutexSerialOptimisticWriter(t *testing.T) {
mu := NewRBMutex()
for i := 0; i < 3; i++ {
if !mu.TryLock() {
t.Fatal("TryLock failed")
}
mu.Unlock()
}
}

Expand Down Expand Up @@ -132,6 +160,76 @@ func TestRBMutex(t *testing.T) {
hammerRBMutex(10, 5, n)
}

func optimisticReader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if tk := mu.TryRLock(); tk != nil {
n := atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
mu.RUnlock(tk)
panic(fmt.Sprintf("rlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
mu.RUnlock(tk)
}
}
cdone <- true
}

func optimisticWriter(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if mu.TryLock() {
n := atomic.AddInt32(activity, 10000)
if n != 10000 {
mu.Unlock()
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
mu.Unlock()
}
}
cdone <- true
}

func hammerOptimisticRBMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
mu := NewRBMutex()
cdone := make(chan bool)
go optimisticWriter(mu, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go optimisticReader(mu, numIterations, &activity, cdone)
}
go optimisticWriter(mu, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go optimisticReader(mu, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
}

func TestRBMutex_Optimistic(t *testing.T) {
const n = 1000
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0))
hammerOptimisticRBMutex(1, 1, n)
hammerOptimisticRBMutex(1, 3, n)
hammerOptimisticRBMutex(1, 10, n)
hammerOptimisticRBMutex(4, 1, n)
hammerOptimisticRBMutex(4, 3, n)
hammerOptimisticRBMutex(4, 10, n)
hammerOptimisticRBMutex(10, 1, n)
hammerOptimisticRBMutex(10, 3, n)
hammerOptimisticRBMutex(10, 10, n)
hammerOptimisticRBMutex(10, 5, n)
}

func benchmarkRBMutex(b *testing.B, parallelism, localWork, writeRatio int) {
mu := NewRBMutex()
b.SetParallelism(parallelism)
Expand Down

0 comments on commit 0cba9ad

Please sign in to comment.