diff --git a/example_test.go b/example_test.go index 1c3115d..ddd309f 100644 --- a/example_test.go +++ b/example_test.go @@ -1,6 +1,7 @@ package xsync_test import ( + "errors" "fmt" "github.com/puzpuzpuz/xsync/v3" @@ -54,4 +55,17 @@ func ExampleMapOf_Compute() { }) // v: 84, ok: false fmt.Printf("v: %v, ok: %v\n", v, ok) + + // Propagate an error from the compute function to the outer scope. + var err error + v, ok = counts.Compute(42, func(oldValue int, loaded bool) (newValue int, delete bool) { + if oldValue == 42 { + err = errors.New("something went wrong") + return 0, true // no need to create a key/value pair + } + newValue = 0 + delete = false + return + }) + fmt.Printf("err: %v\n", err) } diff --git a/export_test.go b/export_test.go index 936f351..af6ae48 100644 --- a/export_test.go +++ b/export_test.go @@ -1,11 +1,11 @@ package xsync const ( - EntriesPerMapBucket = entriesPerMapBucket - MapLoadFactor = mapLoadFactor - MinMapTableLen = minMapTableLen - MinMapTableCap = minMapTableCap - MaxMapCounterLen = maxMapCounterLen + EntriesPerMapBucket = entriesPerMapBucket + MapLoadFactor = mapLoadFactor + DefaultMinMapTableLen = defaultMinMapTableLen + DefaultMinMapTableCap = defaultMinMapTableLen * entriesPerMapBucket + MaxMapCounterLen = maxMapCounterLen ) type ( diff --git a/map.go b/map.go index 1aa1c16..c5cd5e2 100644 --- a/map.go +++ b/map.go @@ -30,10 +30,8 @@ const ( // key-value pairs (this is a soft limit) mapLoadFactor = 0.75 // minimal table size, i.e. number of buckets; thus, minimal map - // capacity can be calculated as entriesPerMapBucket*minMapTableLen - minMapTableLen = 32 - // minimal table capacity - minMapTableCap = minMapTableLen * entriesPerMapBucket + // capacity can be calculated as entriesPerMapBucket*defaultMinMapTableLen + defaultMinMapTableLen = 32 // minimum counter stripes to use minMapCounterLen = 8 // maximum counter stripes to use; stands for around 4KB of memory @@ -76,6 +74,7 @@ type Map struct { resizeMu sync.Mutex // only used along with resizeCond resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications) table unsafe.Pointer // *mapTable + minTableLen int } type mapTable struct { @@ -121,7 +120,7 @@ type rangeEntry struct { // NewMap creates a new Map instance. func NewMap() *Map { - return NewMapPresized(minMapTableCap) + return NewMapPresized(defaultMinMapTableLen * entriesPerMapBucket) } // NewMapPresized creates a new Map instance with capacity enough to hold @@ -130,19 +129,20 @@ func NewMapPresized(sizeHint int) *Map { m := &Map{} m.resizeCond = *sync.NewCond(&m.resizeMu) var table *mapTable - if sizeHint <= minMapTableCap { - table = newMapTable(minMapTableLen) + if sizeHint <= defaultMinMapTableLen*entriesPerMapBucket { + table = newMapTable(defaultMinMapTableLen) } else { tableLen := nextPowOf2(uint32(sizeHint / entriesPerMapBucket)) table = newMapTable(int(tableLen)) } + m.minTableLen = len(table.buckets) atomic.StorePointer(&m.table, unsafe.Pointer(table)) return m } -func newMapTable(tableLen int) *mapTable { - buckets := make([]bucketPadded, tableLen) - counterLen := tableLen >> 10 +func newMapTable(minTableLen int) *mapTable { + buckets := make([]bucketPadded, minTableLen) + counterLen := minTableLen >> 10 if counterLen < minMapCounterLen { counterLen = minMapCounterLen } else if counterLen > maxMapCounterLen { @@ -240,6 +240,11 @@ func (m *Map) LoadAndStore(key string, value interface{}) (actual interface{}, l // Otherwise, it computes the value using the provided function and // returns the computed value. The loaded result is true if the value // was loaded, false if stored. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. func (m *Map) LoadOrCompute(key string, valueFn func() interface{}) (actual interface{}, loaded bool) { return m.doCompute( key, @@ -258,6 +263,11 @@ func (m *Map) LoadOrCompute(key string, valueFn func() interface{}) (actual inte // The ok result indicates whether value was computed and stored, thus, is // present in the map. The actual result contains the new value in cases where // the value was computed and stored. See the example for a few use cases. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. func (m *Map) Compute( key string, valueFn func(oldValue interface{}, loaded bool) (newValue interface{}, delete bool), @@ -461,7 +471,7 @@ func (m *Map) resize(knownTable *mapTable, hint mapResizeHint) { // Fast path for shrink attempts. if hint == mapShrinkHint { shrinkThreshold := int64((knownTableLen * entriesPerMapBucket) / mapShrinkFraction) - if knownTableLen == minMapTableLen || knownTable.sumSize() > shrinkThreshold { + if knownTableLen == m.minTableLen || knownTable.sumSize() > shrinkThreshold { return } } @@ -481,7 +491,7 @@ func (m *Map) resize(knownTable *mapTable, hint mapResizeHint) { newTable = newMapTable(tableLen << 1) case mapShrinkHint: shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) - if tableLen > minMapTableLen && table.sumSize() <= shrinkThreshold { + if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold { // Shrink the table with factor of 2. atomic.AddInt64(&m.totalShrinks, 1) newTable = newMapTable(tableLen >> 1) @@ -494,7 +504,7 @@ func (m *Map) resize(knownTable *mapTable, hint mapResizeHint) { return } case mapClearHint: - newTable = newMapTable(minMapTableLen) + newTable = newMapTable(m.minTableLen) default: panic(fmt.Sprintf("unexpected resize hint: %d", hint)) } @@ -581,9 +591,10 @@ func isEmptyBucket(rootb *bucketPadded) bool { // may reflect any mapping for that key from any point during the // Range call. // -// It is safe to modify the map while iterating it. However, the -// concurrent modification rule apply, i.e. the changes may be not -// reflected in the subsequently iterated entries. +// It is safe to modify the map while iterating it, including entry +// creation, modification and deletion. However, the concurrent +// modification rule apply, i.e. the changes may be not reflected +// in the subsequently iterated entries. func (m *Map) Range(f func(key string, value interface{}) bool) { var zeroEntry rangeEntry // Pre-allocate array big enough to fit entries for most hash tables. diff --git a/map_test.go b/map_test.go index ae92b4b..2c5f868 100644 --- a/map_test.go +++ b/map_test.go @@ -496,8 +496,8 @@ func TestMapStoreThenParallelDelete_DoesNotShrinkBelowMinTableLen(t *testing.T) <-cdone stats := CollectMapStats(m) - if stats.RootBuckets < MinMapTableLen { - t.Fatalf("table was too small: %d", stats.RootBuckets) + if stats.RootBuckets != DefaultMinMapTableLen { + t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } } @@ -573,10 +573,33 @@ func assertMapCapacity(t *testing.T, m *Map, expectedCap int) { } func TestNewMapPresized(t *testing.T) { - assertMapCapacity(t, NewMap(), MinMapTableCap) + assertMapCapacity(t, NewMap(), DefaultMinMapTableCap) assertMapCapacity(t, NewMapPresized(1000), 1536) - assertMapCapacity(t, NewMapPresized(0), MinMapTableCap) - assertMapCapacity(t, NewMapPresized(-1), MinMapTableCap) + assertMapCapacity(t, NewMapPresized(0), DefaultMinMapTableCap) + assertMapCapacity(t, NewMapPresized(-1), DefaultMinMapTableCap) +} + +func TestNewMapPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { + const minTableLen = 1024 + const numEntries = minTableLen * EntriesPerMapBucket + m := NewMapPresized(numEntries) + for i := 0; i < numEntries; i++ { + m.Store(strconv.Itoa(i), i) + } + + stats := CollectMapStats(m) + if stats.RootBuckets <= minTableLen { + t.Fatalf("table did not grow: %d", stats.RootBuckets) + } + + for i := 0; i < numEntries; i++ { + m.Delete(strconv.Itoa(int(i))) + } + + stats = CollectMapStats(m) + if stats.RootBuckets != minTableLen { + t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) + } } func TestMapResize(t *testing.T) { @@ -594,7 +617,7 @@ func TestMapResize(t *testing.T) { if stats.Capacity > expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } - if stats.RootBuckets <= MinMapTableLen { + if stats.RootBuckets <= DefaultMinMapTableLen { t.Fatalf("table was too small: %d", stats.RootBuckets) } if stats.TotalGrowths == 0 { @@ -618,7 +641,7 @@ func TestMapResize(t *testing.T) { if stats.Capacity != expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } - if stats.RootBuckets != MinMapTableLen { + if stats.RootBuckets != DefaultMinMapTableLen { t.Fatalf("table was too large: %d", stats.RootBuckets) } if stats.TotalShrinks == 0 { @@ -696,7 +719,7 @@ func parallelRandResizer(t *testing.T, m *Map, numIters, numEntries int, cdone c func TestMapParallelResize(t *testing.T) { const numIters = 1_000 - const numEntries = 2 * EntriesPerMapBucket * MinMapTableLen + const numEntries = 2 * EntriesPerMapBucket * DefaultMinMapTableLen m := NewMap() cdone := make(chan bool) go parallelRandResizer(t, m, numIters, numEntries, cdone) diff --git a/mapof.go b/mapof.go index 3f2c5fe..4388662 100644 --- a/mapof.go +++ b/mapof.go @@ -32,6 +32,7 @@ type MapOf[K comparable, V any] struct { resizeCond sync.Cond // used to wake up resize waiters (concurrent modifications) table unsafe.Pointer // *mapOfTable hasher func(K, uint64) uint64 + minTableLen int } type mapOfTable[K comparable, V any] struct { @@ -66,7 +67,7 @@ type entryOf[K comparable, V any] struct { // NewMapOf creates a new MapOf instance. func NewMapOf[K comparable, V any]() *MapOf[K, V] { - return NewMapOfPresized[K, V](minMapTableCap) + return NewMapOfPresized[K, V](defaultMinMapTableLen * entriesPerMapBucket) } // NewMapOfPresized creates a new MapOf instance with capacity enough @@ -84,19 +85,20 @@ func newMapOfPresized[K comparable, V any]( m.resizeCond = *sync.NewCond(&m.resizeMu) m.hasher = hasher var table *mapOfTable[K, V] - if sizeHint <= minMapTableCap { - table = newMapOfTable[K, V](minMapTableLen) + if sizeHint <= defaultMinMapTableLen*entriesPerMapBucket { + table = newMapOfTable[K, V](defaultMinMapTableLen) } else { tableLen := nextPowOf2(uint32(sizeHint / entriesPerMapBucket)) table = newMapOfTable[K, V](int(tableLen)) } + m.minTableLen = len(table.buckets) atomic.StorePointer(&m.table, unsafe.Pointer(table)) return m } -func newMapOfTable[K comparable, V any](tableLen int) *mapOfTable[K, V] { - buckets := make([]bucketOfPadded, tableLen) - counterLen := tableLen >> 10 +func newMapOfTable[K comparable, V any](minTableLen int) *mapOfTable[K, V] { + buckets := make([]bucketOfPadded, minTableLen) + counterLen := minTableLen >> 10 if counterLen < minMapCounterLen { counterLen = minMapCounterLen } else if counterLen > maxMapCounterLen { @@ -111,8 +113,8 @@ func newMapOfTable[K comparable, V any](tableLen int) *mapOfTable[K, V] { return t } -// Load returns the value stored in the map for a key, or nil if no -// value is present. +// Load returns the value stored in the map for a key, or zero value +// of type V if no value is present. // The ok result indicates whether value was found in the map. func (m *MapOf[K, V]) Load(key K) (value V, ok bool) { table := (*mapOfTable[K, V])(atomic.LoadPointer(&m.table)) @@ -190,6 +192,11 @@ func (m *MapOf[K, V]) LoadAndStore(key K, value V) (actual V, loaded bool) { // Otherwise, it computes the value using the provided function and // returns the computed value. The loaded result is true if the value // was loaded, false if stored. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. func (m *MapOf[K, V]) LoadOrCompute(key K, valueFn func() V) (actual V, loaded bool) { return m.doCompute( key, @@ -208,6 +215,11 @@ func (m *MapOf[K, V]) LoadOrCompute(key K, valueFn func() V) (actual V, loaded b // The ok result indicates whether value was computed and stored, thus, is // present in the map. The actual result contains the new value in cases where // the value was computed and stored. See the example for a few use cases. +// +// This call locks a hash table bucket while the compute function +// is executed. It means that modifications on other entries in +// the bucket will be blocked until the valueFn executes. Consider +// this when the function includes long-running operations. func (m *MapOf[K, V]) Compute( key K, valueFn func(oldValue V, loaded bool) (newValue V, delete bool), @@ -410,7 +422,7 @@ func (m *MapOf[K, V]) resize(knownTable *mapOfTable[K, V], hint mapResizeHint) { // Fast path for shrink attempts. if hint == mapShrinkHint { shrinkThreshold := int64((knownTableLen * entriesPerMapBucket) / mapShrinkFraction) - if knownTableLen == minMapTableLen || knownTable.sumSize() > shrinkThreshold { + if knownTableLen == m.minTableLen || knownTable.sumSize() > shrinkThreshold { return } } @@ -430,7 +442,7 @@ func (m *MapOf[K, V]) resize(knownTable *mapOfTable[K, V], hint mapResizeHint) { newTable = newMapOfTable[K, V](tableLen << 1) case mapShrinkHint: shrinkThreshold := int64((tableLen * entriesPerMapBucket) / mapShrinkFraction) - if tableLen > minMapTableLen && table.sumSize() <= shrinkThreshold { + if tableLen > m.minTableLen && table.sumSize() <= shrinkThreshold { // Shrink the table with factor of 2. atomic.AddInt64(&m.totalShrinks, 1) newTable = newMapOfTable[K, V](tableLen >> 1) @@ -443,7 +455,7 @@ func (m *MapOf[K, V]) resize(knownTable *mapOfTable[K, V], hint mapResizeHint) { return } case mapClearHint: - newTable = newMapOfTable[K, V](minMapTableLen) + newTable = newMapOfTable[K, V](m.minTableLen) default: panic(fmt.Sprintf("unexpected resize hint: %d", hint)) } @@ -497,9 +509,10 @@ func copyBucketOf[K comparable, V any]( // may reflect any mapping for that key from any point during the // Range call. // -// It is safe to modify the map while iterating it. However, the -// concurrent modification rule apply, i.e. the changes may be not -// reflected in the subsequently iterated entries. +// It is safe to modify the map while iterating it, including entry +// creation, modification and deletion. However, the concurrent +// modification rule apply, i.e. the changes may be not reflected +// in the subsequently iterated entries. func (m *MapOf[K, V]) Range(f func(key K, value V) bool) { var zeroPtr unsafe.Pointer // Pre-allocate array big enough to fit entries for most hash tables. diff --git a/mapof_test.go b/mapof_test.go index 43ea530..00cd2e9 100644 --- a/mapof_test.go +++ b/mapof_test.go @@ -541,8 +541,8 @@ func TestMapOfStoreThenParallelDelete_DoesNotShrinkBelowMinTableLen(t *testing.T <-cdone stats := CollectMapOfStats(m) - if stats.RootBuckets < MinMapTableLen { - t.Fatalf("table was too small: %d", stats.RootBuckets) + if stats.RootBuckets != DefaultMinMapTableLen { + t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) } } @@ -618,14 +618,37 @@ func assertMapOfCapacity[K comparable, V any](t *testing.T, m *MapOf[K, V], expe } func TestNewMapOfPresized(t *testing.T) { - assertMapOfCapacity(t, NewMapOf[string, string](), MinMapTableCap) - assertMapOfCapacity(t, NewMapOfPresized[string, string](0), MinMapTableCap) - assertMapOfCapacity(t, NewMapOfPresized[string, string](-100), MinMapTableCap) + assertMapOfCapacity(t, NewMapOf[string, string](), DefaultMinMapTableCap) + assertMapOfCapacity(t, NewMapOfPresized[string, string](0), DefaultMinMapTableCap) + assertMapOfCapacity(t, NewMapOfPresized[string, string](-100), DefaultMinMapTableCap) assertMapOfCapacity(t, NewMapOfPresized[string, string](500), 768) assertMapOfCapacity(t, NewMapOfPresized[int, int](1_000_000), 1_572_864) assertMapOfCapacity(t, NewMapOfPresized[point, point](100), 192) } +func TestNewMapOfPresized_DoesNotShrinkBelowMinTableLen(t *testing.T) { + const minTableLen = 1024 + const numEntries = minTableLen * EntriesPerMapBucket + m := NewMapOfPresized[int, int](numEntries) + for i := 0; i < numEntries; i++ { + m.Store(i, i) + } + + stats := CollectMapOfStats(m) + if stats.RootBuckets <= minTableLen { + t.Fatalf("table did not grow: %d", stats.RootBuckets) + } + + for i := 0; i < numEntries; i++ { + m.Delete(i) + } + + stats = CollectMapOfStats(m) + if stats.RootBuckets != minTableLen { + t.Fatalf("table length was different from the minimum: %d", stats.RootBuckets) + } +} + func TestMapOfResize(t *testing.T) { const numEntries = 100_000 m := NewMapOf[string, int]() @@ -641,7 +664,7 @@ func TestMapOfResize(t *testing.T) { if stats.Capacity > expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } - if stats.RootBuckets <= MinMapTableLen { + if stats.RootBuckets <= DefaultMinMapTableLen { t.Fatalf("table was too small: %d", stats.RootBuckets) } if stats.TotalGrowths == 0 { @@ -665,7 +688,7 @@ func TestMapOfResize(t *testing.T) { if stats.Capacity != expectedCapacity { t.Fatalf("capacity was too large: %d, expected: %d", stats.Capacity, expectedCapacity) } - if stats.RootBuckets != MinMapTableLen { + if stats.RootBuckets != DefaultMinMapTableLen { t.Fatalf("table was too large: %d", stats.RootBuckets) } if stats.TotalShrinks == 0 { @@ -743,7 +766,7 @@ func parallelRandTypedResizer(t *testing.T, m *MapOf[string, int], numIters, num func TestMapOfParallelResize(t *testing.T) { const numIters = 1_000 - const numEntries = 2 * EntriesPerMapBucket * MinMapTableLen + const numEntries = 2 * EntriesPerMapBucket * DefaultMinMapTableLen m := NewMapOf[string, int]() cdone := make(chan bool) go parallelRandTypedResizer(t, m, numIters, numEntries, cdone)