Skip to content

Commit

Permalink
Fix optimistic RBMutex methods and add tests (#140)
Browse files Browse the repository at this point in the history
  • Loading branch information
puzpuzpuz authored Jul 14, 2024
1 parent f63979d commit f53fbc1
Show file tree
Hide file tree
Showing 2 changed files with 203 additions and 46 deletions.
89 changes: 54 additions & 35 deletions rbmutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,41 @@ func NewRBMutex() *RBMutex {
return &mu
}

// TryRLock tries to retrieve a reader lock token via
// the fast path or gives up returning nil.
func (mu *RBMutex) TryRLock() *RToken {
if _, mux := mu.fastTryRlock(); mux != nil {
return mux
// TryRLock tries to lock m for reading without blocking.
// When TryRLock succeeds, it returns true and a reader token.
// In case of a failure, a false is returned.
func (mu *RBMutex) TryRLock() (bool, *RToken) {
if t := mu.fastRlock(); t != nil {
return true, t
}
return nil
// Optimistic slow path.
if mu.rw.TryRLock() {
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return true, nil
}
return false, nil
}

// TryLock tries to acquire a write lock without blocking,
// exposing the underlying TryLock method of sync.RWMutex.
func (mu *RBMutex) TryLock() bool {
return mu.rw.TryLock()
// RLock locks m for reading and returns a reader token. The
// token must be used in the later RUnlock call.
//
// Should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock.
func (mu *RBMutex) RLock() *RToken {
if t := mu.fastRlock(); t != nil {
return t
}
// Slow path.
mu.rw.RLock()
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return nil
}

func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) {
func (mu *RBMutex) fastRlock() *RToken {
if atomic.LoadInt32(&mu.rbias) == 1 {
t, ok := rtokenPool.Get().(*RToken)
if !ok {
Expand All @@ -95,36 +114,16 @@ func (mu *RBMutex) fastTryRlock() (*rslot, *RToken) {
if atomic.LoadInt32(&mu.rbias) == 1 {
// Hot path succeeded.
t.slot = slot
return rslot, t
return t
}
// The mutex is no longer reader biased. Roll back.
atomic.AddInt32(&rslot.mu, -1)
rtokenPool.Put(t)
return rslot, nil
return nil
}
// Contention detected. Give a try with the next slot.
}
}
return nil, nil
}

// RLock locks m for reading and returns a reader token. The
// token must be used in the later RUnlock call.
//
// Should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock.
func (mu *RBMutex) RLock() *RToken {
rslot, r := mu.fastTryRlock()
if r != nil {
return r
}
if rslot != nil {
// The mutex is no longer reader biased. Go to the slow path.
atomic.AddInt32(&rslot.mu, -1)
}
// Slow path.
mu.rw.RLock()
if atomic.LoadInt32(&mu.rbias) == 0 && time.Now().After(mu.inhibitUntil) {
atomic.StoreInt32(&mu.rbias, 1)
}
return nil
}

Expand All @@ -143,6 +142,26 @@ func (mu *RBMutex) RUnlock(t *RToken) {
rtokenPool.Put(t)
}

// TryLock tries to lock m for writing without blocking.
func (mu *RBMutex) TryLock() bool {
if mu.rw.TryLock() {
if atomic.LoadInt32(&mu.rbias) == 1 {
atomic.StoreInt32(&mu.rbias, 0)
for i := 0; i < len(mu.rslots); i++ {
if atomic.LoadInt32(&mu.rslots[i].mu) > 0 {
// There is a reader. Roll back.
atomic.StoreInt32(&mu.rbias, 1)
mu.rw.Unlock()
return false
}
}
mu.inhibitUntil = time.Now()
}
return true
}
return false
}

// Lock locks m for writing. If the lock is already locked for
// reading or writing, Lock blocks until the lock is available.
func (mu *RBMutex) Lock() {
Expand Down
160 changes: 149 additions & 11 deletions rbmutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,55 @@ import (
)

func TestRBMutexSerialReader(t *testing.T) {
const numIters = 10
const numCalls = 10
mu := NewRBMutex()
var rtokens [numIters]*RToken
for i := 0; i < numIters; i++ {
rtokens[i] = mu.RLock()
for i := 0; i < 3; i++ {
var rtokens [numCalls]*RToken
for j := 0; j < numCalls; j++ {
rtokens[j] = mu.RLock()
}
for j := 0; j < numCalls; j++ {
mu.RUnlock(rtokens[j])
}
}
}

func TestRBMutexSerialOptimisticReader(t *testing.T) {
const numCalls = 10
mu := NewRBMutex()
for i := 0; i < 3; i++ {
var rtokens [numCalls]*RToken
for j := 0; j < numCalls; j++ {
ok, rt := mu.TryRLock()
if !ok {
t.Fatalf("TryRLock failed for %d", j)
}
if rt == nil {
t.Fatalf("nil reader token for %d", j)
}
rtokens[j] = rt
}
for j := 0; j < numCalls; j++ {
mu.RUnlock(rtokens[j])
}
}
for i := 0; i < numIters; i++ {
mu.RUnlock(rtokens[i])
}

func TestRBMutexSerialOptimisticWriter(t *testing.T) {
mu := NewRBMutex()
for i := 0; i < 3; i++ {
if !mu.TryLock() {
t.Fatal("TryLock failed")
}
mu.Unlock()
}
}

func parallelReader(mu *RBMutex, clocked, cunlock, cdone chan bool) {
tk := mu.RLock()
t := mu.RLock()
clocked <- true
<-cunlock
mu.RUnlock(tk)
mu.RUnlock(t)
cdone <- true
}

Expand Down Expand Up @@ -66,16 +98,16 @@ func TestRBMutexParallelReaders(t *testing.T) {

func reader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
tk := mu.RLock()
t := mu.RLock()
n := atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
mu.RUnlock(tk)
mu.RUnlock(t)
panic(fmt.Sprintf("rlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
mu.RUnlock(tk)
mu.RUnlock(t)
}
cdone <- true
}
Expand Down Expand Up @@ -132,6 +164,112 @@ func TestRBMutex(t *testing.T) {
hammerRBMutex(10, 5, n)
}

func optimisticReader(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if ok, t := mu.TryRLock(); ok {
n := atomic.AddInt32(activity, 1)
if n < 1 || n >= 10000 {
mu.RUnlock(t)
panic(fmt.Sprintf("rlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -1)
mu.RUnlock(t)
}
}
cdone <- true
}

func optimisticWriter(mu *RBMutex, numIterations int, activity *int32, cdone chan bool) {
for i := 0; i < numIterations; i++ {
if mu.TryLock() {
n := atomic.AddInt32(activity, 10000)
if n != 10000 {
mu.Unlock()
panic(fmt.Sprintf("wlock(%d)\n", n))
}
for i := 0; i < 100; i++ {
}
atomic.AddInt32(activity, -10000)
mu.Unlock()
}
}
cdone <- true
}

func hammerOptimisticRBMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
mu := NewRBMutex()
cdone := make(chan bool)
go optimisticWriter(mu, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go optimisticReader(mu, numIterations, &activity, cdone)
}
go optimisticWriter(mu, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go optimisticReader(mu, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
}

func TestRBMutex_Optimistic(t *testing.T) {
const n = 1000
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0))
hammerOptimisticRBMutex(1, 1, n)
hammerOptimisticRBMutex(1, 3, n)
hammerOptimisticRBMutex(1, 10, n)
hammerOptimisticRBMutex(4, 1, n)
hammerOptimisticRBMutex(4, 3, n)
hammerOptimisticRBMutex(4, 10, n)
hammerOptimisticRBMutex(10, 1, n)
hammerOptimisticRBMutex(10, 3, n)
hammerOptimisticRBMutex(10, 10, n)
hammerOptimisticRBMutex(10, 5, n)
}

func hammerMixedRBMutex(gomaxprocs, numReaders, numIterations int) {
runtime.GOMAXPROCS(gomaxprocs)
// Number of active readers + 10000 * number of active writers.
var activity int32
mu := NewRBMutex()
cdone := make(chan bool)
go writer(mu, numIterations, &activity, cdone)
var i int
for i = 0; i < numReaders/2; i++ {
go reader(mu, numIterations, &activity, cdone)
}
go optimisticWriter(mu, numIterations, &activity, cdone)
for ; i < numReaders; i++ {
go optimisticReader(mu, numIterations, &activity, cdone)
}
// Wait for the 2 writers and all readers to finish.
for i := 0; i < 2+numReaders; i++ {
<-cdone
}
}

func TestRBMutex_Mixed(t *testing.T) {
const n = 1000
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(0))
hammerMixedRBMutex(1, 1, n)
hammerMixedRBMutex(1, 3, n)
hammerMixedRBMutex(1, 10, n)
hammerMixedRBMutex(4, 1, n)
hammerMixedRBMutex(4, 3, n)
hammerMixedRBMutex(4, 10, n)
hammerMixedRBMutex(10, 1, n)
hammerMixedRBMutex(10, 3, n)
hammerMixedRBMutex(10, 10, n)
hammerMixedRBMutex(10, 5, n)
}

func benchmarkRBMutex(b *testing.B, parallelism, localWork, writeRatio int) {
mu := NewRBMutex()
b.SetParallelism(parallelism)
Expand Down

0 comments on commit f53fbc1

Please sign in to comment.